View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23  
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.List;
27  import java.util.NavigableMap;
28  import java.util.UUID;
29  import java.util.concurrent.Executors;
30  import java.util.concurrent.ScheduledExecutorService;
31  import java.util.concurrent.TimeUnit;
32  
33  import org.apache.commons.logging.Log;
34  import org.apache.commons.logging.LogFactory;
35  import org.apache.hadoop.hbase.classification.InterfaceAudience;
36  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellScanner;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.Server;
45  import org.apache.hadoop.hbase.TableName;
46  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
47  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
48  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
49  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
50  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
51  import org.apache.hadoop.hbase.wal.WALKey;
52  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
53  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54  import org.apache.hadoop.hbase.replication.ReplicationException;
55  import org.apache.hadoop.hbase.replication.ReplicationFactory;
56  import org.apache.hadoop.hbase.replication.ReplicationPeers;
57  import org.apache.hadoop.hbase.replication.ReplicationQueues;
58  import org.apache.hadoop.hbase.replication.ReplicationTracker;
59  import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
60  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
61  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
62  import org.apache.zookeeper.KeeperException;
63  
64  import com.google.common.util.concurrent.ThreadFactoryBuilder;
65  
66  /**
67   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
68   */
69  @InterfaceAudience.Private
70  public class Replication extends WALActionsListener.Base implements
71    ReplicationSourceService, ReplicationSinkService {
72    private static final Log LOG =
73        LogFactory.getLog(Replication.class);
74    private boolean replication;
75    private boolean replicationForBulkLoadData;
76    private ReplicationSourceManager replicationManager;
77    private ReplicationQueues replicationQueues;
78    private ReplicationPeers replicationPeers;
79    private ReplicationTracker replicationTracker;
80    private Configuration conf;
81    private ReplicationSink replicationSink;
82    // Hosting server
83    private Server server;
84    /** Statistics thread schedule pool */
85    private ScheduledExecutorService scheduleThreadPool;
86    private int statsThreadPeriod;
87    // ReplicationLoad to access replication metrics
88    private ReplicationLoad replicationLoad;
89    /**
90     * Instantiate the replication management (if rep is enabled).
91     * @param server Hosting server
92     * @param fs handle to the filesystem
93     * @param logDir
94     * @param oldLogDir directory where logs are archived
95     * @throws IOException
96     */
97    public Replication(final Server server, final FileSystem fs,
98        final Path logDir, final Path oldLogDir) throws IOException{
99      initialize(server, fs, logDir, oldLogDir);
100   }
101 
102   /**
103    * Empty constructor
104    */
105   public Replication() {
106   }
107 
108   public void initialize(final Server server, final FileSystem fs,
109       final Path logDir, final Path oldLogDir) throws IOException {
110     this.server = server;
111     this.conf = this.server.getConfiguration();
112     this.replication = isReplication(this.conf);
113     this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
114     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
115       new ThreadFactoryBuilder()
116         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
117         .setDaemon(true)
118         .build());
119     if (this.replicationForBulkLoadData) {
120       if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
121           || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
122         throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
123             + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
124             + " is set to true.");
125       }
126     }
127     if (replication) {
128       try {
129         this.replicationQueues =
130             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
131         this.replicationQueues.init(this.server.getServerName().toString());
132         this.replicationPeers =
133             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
134         this.replicationPeers.init();
135         this.replicationTracker =
136             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
137               this.conf, this.server, this.server);
138       } catch (ReplicationException e) {
139         throw new IOException("Failed replication handler create", e);
140       }
141       UUID clusterId = null;
142       try {
143         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
144       } catch (KeeperException ke) {
145         throw new IOException("Could not read cluster id", ke);
146       }
147       this.replicationManager =
148           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
149               conf, this.server, fs, logDir, oldLogDir, clusterId);
150       this.statsThreadPeriod =
151           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
152       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
153       this.replicationLoad = new ReplicationLoad();
154     } else {
155       this.replicationManager = null;
156       this.replicationQueues = null;
157       this.replicationPeers = null;
158       this.replicationTracker = null;
159       this.replicationLoad = null;
160     }
161   }
162 
163    /**
164     * @param c Configuration to look at
165     * @return True if replication is enabled.
166     */
167   public static boolean isReplication(final Configuration c) {
168     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
169   }
170 
171   /**
172    * @param c Configuration to look at
173    * @return True if replication for bulk load data is enabled.
174    */
175   public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
176     return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
177       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
178   }
179 
180    /*
181     * Returns an object to listen to new wal changes
182     **/
183   public WALActionsListener getWALActionsListener() {
184     return this;
185   }
186   /**
187    * Stops replication service.
188    */
189   public void stopReplicationService() {
190     join();
191   }
192 
193   /**
194    * Join with the replication threads
195    */
196   public void join() {
197     if (this.replication) {
198       this.replicationManager.join();
199       if (this.replicationSink != null) {
200         this.replicationSink.stopReplicationSinkServices();
201       }
202     }
203     scheduleThreadPool.shutdown();
204   }
205 
206   /**
207    * Carry on the list of log entries down to the sink
208    * @param entries list of entries to replicate
209    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
210    *          contain the Cells we are replicating; they are passed here on the side in this
211    *          CellScanner).
212    * @param replicationClusterId Id which will uniquely identify source cluster FS client
213    *          configurations in the replication configuration directory
214    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
215    *          directory required for replicating hfiles
216    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
217    * @throws IOException
218    */
219   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
220       String replicationClusterId, String sourceBaseNamespaceDirPath,
221       String sourceHFileArchiveDirPath) throws IOException {
222     if (this.replication) {
223       this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
224         sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
225     }
226   }
227 
228   /**
229    * If replication is enabled and this cluster is a master,
230    * it starts
231    * @throws IOException
232    */
233   public void startReplicationService() throws IOException {
234     if (this.replication) {
235       try {
236         this.replicationManager.init();
237       } catch (ReplicationException e) {
238         throw new IOException(e);
239       }
240       this.replicationSink = new ReplicationSink(this.conf, this.server);
241       this.scheduleThreadPool.scheduleAtFixedRate(
242         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
243         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
244     }
245   }
246 
247   /**
248    * Get the replication sources manager
249    * @return the manager if replication is enabled, else returns false
250    */
251   public ReplicationSourceManager getReplicationManager() {
252     return this.replicationManager;
253   }
254 
255   @Override
256   public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
257     scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
258   }
259 
260   @Override
261   public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
262       final WALEdit edit) throws IOException {
263     NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
264     if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) {
265       TableName tableName = logKey.getTablename();
266       for (Cell c : edit.getCells()) {
267         // Only check for bulk load events
268         if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
269           BulkLoadDescriptor bld = null;
270           try {
271             bld = WALEdit.getBulkLoadDescriptor(c);
272           } catch (IOException e) {
273             LOG.error("Failed to get bulk load events information from the wal file.", e);
274             throw e;
275           }
276 
277           for (StoreDescriptor s : bld.getStoresList()) {
278             byte[] fam = s.getFamilyName().toByteArray();
279             if (scopes.containsKey(fam)) {
280               addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
281             }
282           }
283         }
284       }
285     }
286   }
287 
288   /**
289    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
290    * compaction WAL edits and if the scope is local.
291    * @param logKey Key that may get scoped according to its edits
292    * @param logEdit Edits used to lookup the scopes
293    * @param replicationManager Manager used to add bulk load events hfile references
294    * @throws IOException If failed to parse the WALEdit
295    */
296   public static void scopeWALEdits(WALKey logKey,
297       WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
298           throws IOException {
299     boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
300     boolean foundOtherEdits = false;
301     for (Cell cell : logEdit.getCells()) {
302       if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
303         foundOtherEdits = true;
304       }
305     }
306     if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
307       logKey.serializeReplicationScope(false);
308     }
309   }
310 
311   private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
312       TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
313     try {
314       replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
315     } catch (ReplicationException e) {
316       LOG.error("Failed to add hfile references in the replication queue.", e);
317       throw new IOException(e);
318     }
319   }
320 
321   @Override
322   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
323     getReplicationManager().preLogRoll(newPath);
324   }
325 
326   @Override
327   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
328     getReplicationManager().postLogRoll(newPath);
329   }
330 
331   /**
332    * This method modifies the master's configuration in order to inject replication-related features
333    * @param conf
334    */
335   public static void decorateMasterConfiguration(Configuration conf) {
336     if (!isReplication(conf)) {
337       return;
338     }
339     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
340     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
341     if (!plugins.contains(cleanerClass)) {
342       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
343     }
344     if (isReplicationForBulkLoadDataEnabled(conf)) {
345       plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
346       cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
347       if (!plugins.contains(cleanerClass)) {
348         conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
349       }
350     }
351   }
352 
353   /*
354    * Statistics thread. Periodically prints the cache statistics to the log.
355    */
356   static class ReplicationStatisticsThread extends Thread {
357 
358     private final ReplicationSink replicationSink;
359     private final ReplicationSourceManager replicationManager;
360 
361     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
362                             final ReplicationSourceManager replicationManager) {
363       super("ReplicationStatisticsThread");
364       this.replicationManager = replicationManager;
365       this.replicationSink = replicationSink;
366     }
367 
368     @Override
369     public void run() {
370       printStats(this.replicationManager.getStats());
371       printStats(this.replicationSink.getStats());
372     }
373 
374     private void printStats(String stats) {
375       if (!stats.isEmpty()) {
376         LOG.info(stats);
377       }
378     }
379   }
380 
381   @Override
382   public ReplicationLoad refreshAndGetReplicationLoad() {
383     if (this.replicationLoad == null) {
384       return null;
385     }
386     // always build for latest data
387     buildReplicationLoad();
388     return this.replicationLoad;
389   }
390 
391   private void buildReplicationLoad() {
392     // get source
393     List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
394     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
395 
396     for (ReplicationSourceInterface source : sources) {
397       if (source instanceof ReplicationSource) {
398         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
399       }
400     }
401     // get sink
402     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
403     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
404   }
405 }