View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableMap;
27  import java.util.UUID;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellScanner;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Server;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
46  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
47  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
48  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
49  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
50  import org.apache.hadoop.hbase.replication.ReplicationException;
51  import org.apache.hadoop.hbase.replication.ReplicationFactory;
52  import org.apache.hadoop.hbase.replication.ReplicationPeers;
53  import org.apache.hadoop.hbase.replication.ReplicationQueues;
54  import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
55  import org.apache.hadoop.hbase.replication.ReplicationTracker;
56  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
57  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58  import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
59  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
60  import org.apache.hadoop.hbase.wal.WALKey;
61  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
62  import org.apache.zookeeper.KeeperException;
63
64  import com.google.common.util.concurrent.ThreadFactoryBuilder;
65
66  /**
67   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
68   */
69  @InterfaceAudience.Private
70  public class Replication extends WALActionsListener.Base implements
71    ReplicationSourceService, ReplicationSinkService {
72    private static final Log LOG =
73        LogFactory.getLog(Replication.class);
74    private boolean replicationForBulkLoadData;
75    private ReplicationSourceManager replicationManager;
76    private ReplicationQueues replicationQueues;
77    private ReplicationPeers replicationPeers;
78    private ReplicationTracker replicationTracker;
79    private Configuration conf;
80    private ReplicationSink replicationSink;
81    // Hosting server
82    private Server server;
83    /** Statistics thread schedule pool */
84    private ScheduledExecutorService scheduleThreadPool;
85    private int statsThreadPeriod;
86    // ReplicationLoad to access replication metrics
87    private ReplicationLoad replicationLoad;
88    /**
89     * Instantiate the replication management (if rep is enabled).
90     * @param server Hosting server
91     * @param fs handle to the filesystem
92     * @param logDir
93     * @param oldLogDir directory where logs are archived
94     * @throws IOException
95     */
96    public Replication(final Server server, final FileSystem fs,
97        final Path logDir, final Path oldLogDir) throws IOException{
98      initialize(server, fs, logDir, oldLogDir);
99    }
100
101   /**
102    * Empty constructor
103    */
104   public Replication() {
105   }
106
107   public void initialize(final Server server, final FileSystem fs,
108       final Path logDir, final Path oldLogDir) throws IOException {
109     this.server = server;
110     this.conf = this.server.getConfiguration();
111     this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
112     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
113       new ThreadFactoryBuilder()
114         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
115         .setDaemon(true)
116         .build());
117     if (this.replicationForBulkLoadData) {
118       if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
119           || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
120         throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
121             + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
122             + " is set to true.");
123       }
124     }
125
126     try {
127       this.replicationQueues =
128           ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
129             server.getZooKeeper()));
130       this.replicationQueues.init(this.server.getServerName().toString());
131       this.replicationPeers =
132           ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
133       this.replicationPeers.init();
134       this.replicationTracker =
135           ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
136             this.conf, this.server, this.server);
137     } catch (Exception e) {
138       throw new IOException("Failed replication handler create", e);
139     }
140     UUID clusterId = null;
141     try {
142       clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
143     } catch (KeeperException ke) {
144       throw new IOException("Could not read cluster id", ke);
145     }
146     this.replicationManager =
147         new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
148             conf, this.server, fs, logDir, oldLogDir, clusterId);
149     this.statsThreadPeriod =
150         this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
151     LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
152     this.replicationLoad = new ReplicationLoad();
153   }
154
155   /**
156    * @param c Configuration to look at
157    * @return True if replication for bulk load data is enabled.
158    */
159   public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
160     return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
161       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
162   }
163
164    /*
165     * Returns an object to listen to new wal changes
166     **/
167   public WALActionsListener getWALActionsListener() {
168     return this;
169   }
170   /**
171    * Stops replication service.
172    */
173   public void stopReplicationService() {
174     join();
175   }
176
177   /**
178    * Join with the replication threads
179    */
180   public void join() {
181     this.replicationManager.join();
182     if (this.replicationSink != null) {
183       this.replicationSink.stopReplicationSinkServices();
184     }
185     scheduleThreadPool.shutdown();
186   }
187
188   /**
189    * Carry on the list of log entries down to the sink
190    * @param entries list of entries to replicate
191    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
192    *          contain the Cells we are replicating; they are passed here on the side in this
193    *          CellScanner).
194    * @param replicationClusterId Id which will uniquely identify source cluster FS client
195    *          configurations in the replication configuration directory
196    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
197    *          directory required for replicating hfiles
198    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
199    * @throws IOException
200    */
201   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
202       String replicationClusterId, String sourceBaseNamespaceDirPath,
203       String sourceHFileArchiveDirPath) throws IOException {
204     this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
205       sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
206   }
207
208   /**
209    * If replication is enabled and this cluster is a master,
210    * it starts
211    * @throws IOException
212    */
213   public void startReplicationService() throws IOException {
214     try {
215       this.replicationManager.init();
216     } catch (ReplicationException e) {
217       throw new IOException(e);
218     }
219     this.replicationSink = new ReplicationSink(this.conf, this.server);
220     this.scheduleThreadPool.scheduleAtFixedRate(
221       new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
222       statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
223   }
224
225   /**
226    * Get the replication sources manager
227    * @return the manager if replication is enabled, else returns false
228    */
229   public ReplicationSourceManager getReplicationManager() {
230     return this.replicationManager;
231   }
232
233   @Override
234   public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
235     scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
236   }
237
238   @Override
239   public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
240       final WALEdit edit) throws IOException {
241     NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
242     if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) {
243       TableName tableName = logKey.getTablename();
244       for (Cell c : edit.getCells()) {
245         // Only check for bulk load events
246         if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
247           BulkLoadDescriptor bld = null;
248           try {
249             bld = WALEdit.getBulkLoadDescriptor(c);
250           } catch (IOException e) {
251             LOG.error("Failed to get bulk load events information from the wal file.", e);
252             throw e;
253           }
254
255           for (StoreDescriptor s : bld.getStoresList()) {
256             byte[] fam = s.getFamilyName().toByteArray();
257             if (scopes.containsKey(fam)) {
258               addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
259             }
260           }
261         }
262       }
263     }
264   }
265
266   /**
267    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
268    * compaction WAL edits and if the scope is local.
269    * @param logKey Key that may get scoped according to its edits
270    * @param logEdit Edits used to lookup the scopes
271    * @param replicationManager Manager used to add bulk load events hfile references
272    * @throws IOException If failed to parse the WALEdit
273    */
274   public static void scopeWALEdits(WALKey logKey,
275       WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
276           throws IOException {
277     boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
278     boolean foundOtherEdits = false;
279     for (Cell cell : logEdit.getCells()) {
280       if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
281         foundOtherEdits = true;
282       }
283     }
284     if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
285       logKey.serializeReplicationScope(false);
286     }
287   }
288
289   private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
290       TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
291     try {
292       replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
293     } catch (ReplicationException e) {
294       LOG.error("Failed to add hfile references in the replication queue.", e);
295       throw new IOException(e);
296     }
297   }
298
299   @Override
300   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
301     getReplicationManager().preLogRoll(newPath);
302   }
303
304   @Override
305   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
306     getReplicationManager().postLogRoll(newPath);
307   }
308
309   /**
310    * This method modifies the master's configuration in order to inject replication-related features
311    * @param conf
312    */
313   public static void decorateMasterConfiguration(Configuration conf) {
314     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
315     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
316     if (!plugins.contains(cleanerClass)) {
317       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
318     }
319     if (isReplicationForBulkLoadDataEnabled(conf)) {
320       plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
321       cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
322       if (!plugins.contains(cleanerClass)) {
323         conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
324       }
325     }
326   }
327
328   /*
329    * Statistics thread. Periodically prints the cache statistics to the log.
330    */
331   static class ReplicationStatisticsThread extends Thread {
332
333     private final ReplicationSink replicationSink;
334     private final ReplicationSourceManager replicationManager;
335
336     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
337                             final ReplicationSourceManager replicationManager) {
338       super("ReplicationStatisticsThread");
339       this.replicationManager = replicationManager;
340       this.replicationSink = replicationSink;
341     }
342
343     @Override
344     public void run() {
345       printStats(this.replicationManager.getStats());
346       printStats(this.replicationSink.getStats());
347     }
348
349     private void printStats(String stats) {
350       if (!stats.isEmpty()) {
351         LOG.info(stats);
352       }
353     }
354   }
355
356   @Override
357   public ReplicationLoad refreshAndGetReplicationLoad() {
358     if (this.replicationLoad == null) {
359       return null;
360     }
361     // always build for latest data
362     buildReplicationLoad();
363     return this.replicationLoad;
364   }
365
366   private void buildReplicationLoad() {
367     // get source
368     List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
369     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
370
371     for (ReplicationSourceInterface source : sources) {
372       if (source instanceof ReplicationSource) {
373         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
374       }
375     }
376     // get sink
377     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
378     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
379   }
380 }