View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22
23  import java.io.IOException;
24  import java.util.ArrayList;
25  import java.util.List;
26  import java.util.NavigableMap;
27  import java.util.UUID;
28  import java.util.concurrent.Executors;
29  import java.util.concurrent.ScheduledExecutorService;
30  import java.util.concurrent.TimeUnit;
31
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.hbase.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.master.cleaner.HFileCleaner;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.FileSystem;
38  import org.apache.hadoop.fs.Path;
39  import org.apache.hadoop.hbase.Cell;
40  import org.apache.hadoop.hbase.CellScanner;
41  import org.apache.hadoop.hbase.CellUtil;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Server;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
46  import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
47  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
48  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
49  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
50  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
51  import org.apache.hadoop.hbase.replication.ReplicationException;
52  import org.apache.hadoop.hbase.replication.ReplicationFactory;
53  import org.apache.hadoop.hbase.replication.ReplicationPeers;
54  import org.apache.hadoop.hbase.replication.ReplicationQueues;
55  import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
56  import org.apache.hadoop.hbase.replication.ReplicationTracker;
57  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
58  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
59  import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
60  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
61  import org.apache.hadoop.hbase.wal.WALKey;
62  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
63  import org.apache.zookeeper.KeeperException;
64
65  import com.google.common.util.concurrent.ThreadFactoryBuilder;
66
67  /**
68   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
69   */
70  @InterfaceAudience.Private
71  public class Replication extends WALActionsListener.Base implements
72    ReplicationSourceService, ReplicationSinkService {
73    private static final Log LOG =
74        LogFactory.getLog(Replication.class);
75    private boolean replicationForBulkLoadData;
76    private ReplicationSourceManager replicationManager;
77    private ReplicationQueues replicationQueues;
78    private ReplicationPeers replicationPeers;
79    private ReplicationTracker replicationTracker;
80    private Configuration conf;
81    private ReplicationSink replicationSink;
82    // Hosting server
83    private Server server;
84    /** Statistics thread schedule pool */
85    private ScheduledExecutorService scheduleThreadPool;
86    private int statsThreadPeriod;
87    // ReplicationLoad to access replication metrics
88    private ReplicationLoad replicationLoad;
89    /**
90     * Instantiate the replication management (if rep is enabled).
91     * @param server Hosting server
92     * @param fs handle to the filesystem
93     * @param logDir
94     * @param oldLogDir directory where logs are archived
95     * @throws IOException
96     */
97    public Replication(final Server server, final FileSystem fs,
98        final Path logDir, final Path oldLogDir) throws IOException{
99      initialize(server, fs, logDir, oldLogDir);
100   }
101
102   /**
103    * Empty constructor
104    */
105   public Replication() {
106   }
107
108   public void initialize(final Server server, final FileSystem fs,
109       final Path logDir, final Path oldLogDir) throws IOException {
110     this.server = server;
111     this.conf = this.server.getConfiguration();
112     this.replicationForBulkLoadData = isReplicationForBulkLoadDataEnabled(this.conf);
113     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
114       new ThreadFactoryBuilder()
115         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
116         .setDaemon(true)
117         .build());
118     if (this.replicationForBulkLoadData) {
119       if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
120           || conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
121         throw new IllegalArgumentException(HConstants.REPLICATION_CLUSTER_ID
122             + " cannot be null/empty when " + HConstants.REPLICATION_BULKLOAD_ENABLE_KEY
123             + " is set to true.");
124       }
125     }
126
127     try {
128       this.replicationQueues =
129           ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, this.server,
130             server.getZooKeeper()));
131       this.replicationQueues.init(this.server.getServerName().toString());
132       this.replicationPeers =
133           ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
134       this.replicationPeers.init();
135       this.replicationTracker =
136           ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
137             this.conf, this.server, this.server);
138     } catch (Exception e) {
139       throw new IOException("Failed replication handler create", e);
140     }
141     UUID clusterId = null;
142     try {
143       clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
144     } catch (KeeperException ke) {
145       throw new IOException("Could not read cluster id", ke);
146     }
147     this.replicationManager =
148         new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
149             conf, this.server, fs, logDir, oldLogDir, clusterId);
150     this.statsThreadPeriod =
151         this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
152     LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
153     this.replicationLoad = new ReplicationLoad();
154   }
155
156   /**
157    * @param c Configuration to look at
158    * @return True if replication for bulk load data is enabled.
159    */
160   public static boolean isReplicationForBulkLoadDataEnabled(final Configuration c) {
161     return c.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
162       HConstants.REPLICATION_BULKLOAD_ENABLE_DEFAULT);
163   }
164
165    /*
166     * Returns an object to listen to new wal changes
167     **/
168   public WALActionsListener getWALActionsListener() {
169     return this;
170   }
171   /**
172    * Stops replication service.
173    */
174   public void stopReplicationService() {
175     join();
176   }
177
178   /**
179    * Join with the replication threads
180    */
181   public void join() {
182     this.replicationManager.join();
183     if (this.replicationSink != null) {
184       this.replicationSink.stopReplicationSinkServices();
185     }
186     scheduleThreadPool.shutdown();
187   }
188
189   /**
190    * Carry on the list of log entries down to the sink
191    * @param entries list of entries to replicate
192    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
193    *          contain the Cells we are replicating; they are passed here on the side in this
194    *          CellScanner).
195    * @param replicationClusterId Id which will uniquely identify source cluster FS client
196    *          configurations in the replication configuration directory
197    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
198    *          directory required for replicating hfiles
199    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
200    * @throws IOException
201    */
202   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
203       String replicationClusterId, String sourceBaseNamespaceDirPath,
204       String sourceHFileArchiveDirPath) throws IOException {
205     this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
206       sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
207   }
208
209   /**
210    * If replication is enabled and this cluster is a master,
211    * it starts
212    * @throws IOException
213    */
214   public void startReplicationService() throws IOException {
215     try {
216       this.replicationManager.init();
217     } catch (ReplicationException e) {
218       throw new IOException(e);
219     }
220     this.replicationSink = new ReplicationSink(this.conf, this.server);
221     this.scheduleThreadPool.scheduleAtFixedRate(
222       new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
223       statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
224   }
225
226   /**
227    * Get the replication sources manager
228    * @return the manager if replication is enabled, else returns false
229    */
230   public ReplicationSourceManager getReplicationManager() {
231     return this.replicationManager;
232   }
233
234   @Override
235   public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
236     scopeWALEdits(logKey, logEdit, this.conf, this.getReplicationManager());
237   }
238
239   @Override
240   public void postAppend(long entryLen, long elapsedTimeMillis, final WALKey logKey,
241       final WALEdit edit) throws IOException {
242     NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
243     if (this.replicationForBulkLoadData && scopes != null && !scopes.isEmpty()) {
244       TableName tableName = logKey.getTablename();
245       for (Cell c : edit.getCells()) {
246         // Only check for bulk load events
247         if (CellUtil.matchingQualifier(c, WALEdit.BULK_LOAD)) {
248           BulkLoadDescriptor bld = null;
249           try {
250             bld = WALEdit.getBulkLoadDescriptor(c);
251           } catch (IOException e) {
252             LOG.error("Failed to get bulk load events information from the wal file.", e);
253             throw e;
254           }
255
256           for (StoreDescriptor s : bld.getStoresList()) {
257             byte[] fam = s.getFamilyName().toByteArray();
258             if (scopes.containsKey(fam)) {
259               addHFileRefsToQueue(this.getReplicationManager(), tableName, fam, s);
260             }
261           }
262         }
263       }
264     }
265   }
266
267   /**
268    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
269    * compaction WAL edits and if the scope is local.
270    * @param logKey Key that may get scoped according to its edits
271    * @param logEdit Edits used to lookup the scopes
272    * @param replicationManager Manager used to add bulk load events hfile references
273    * @throws IOException If failed to parse the WALEdit
274    */
275   public static void scopeWALEdits(WALKey logKey,
276       WALEdit logEdit, Configuration conf, ReplicationSourceManager replicationManager)
277           throws IOException {
278     boolean replicationForBulkLoadEnabled = isReplicationForBulkLoadDataEnabled(conf);
279     boolean foundOtherEdits = false;
280     for (Cell cell : logEdit.getCells()) {
281       if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
282         foundOtherEdits = true;
283         break;
284       }
285     }
286
287     if (!foundOtherEdits && logEdit.getCells().size() > 0) {
288       WALProtos.RegionEventDescriptor maybeEvent =
289           WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
290       if (maybeEvent != null && (maybeEvent.getEventType() ==
291           WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
292         // In serially replication, we use scopes when reading close marker.
293         foundOtherEdits = true;
294       }
295     }
296     if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
297       logKey.serializeReplicationScope(false);
298     }
299   }
300
301   private static void addHFileRefsToQueue(ReplicationSourceManager replicationManager,
302       TableName tableName, byte[] family, StoreDescriptor s) throws IOException {
303     try {
304       replicationManager.addHFileRefs(tableName, family, s.getStoreFileList());
305     } catch (ReplicationException e) {
306       LOG.error("Failed to add hfile references in the replication queue.", e);
307       throw new IOException(e);
308     }
309   }
310
311   @Override
312   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
313     getReplicationManager().preLogRoll(newPath);
314   }
315
316   @Override
317   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
318     getReplicationManager().postLogRoll(newPath);
319   }
320
321   /**
322    * This method modifies the master's configuration in order to inject replication-related features
323    * @param conf
324    */
325   public static void decorateMasterConfiguration(Configuration conf) {
326     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
327     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
328     if (!plugins.contains(cleanerClass)) {
329       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
330     }
331     if (isReplicationForBulkLoadDataEnabled(conf)) {
332       plugins = conf.get(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS);
333       cleanerClass = ReplicationHFileCleaner.class.getCanonicalName();
334       if (!plugins.contains(cleanerClass)) {
335         conf.set(HFileCleaner.MASTER_HFILE_CLEANER_PLUGINS, plugins + "," + cleanerClass);
336       }
337     }
338   }
339
340   /*
341    * Statistics thread. Periodically prints the cache statistics to the log.
342    */
343   static class ReplicationStatisticsThread extends Thread {
344
345     private final ReplicationSink replicationSink;
346     private final ReplicationSourceManager replicationManager;
347
348     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
349                             final ReplicationSourceManager replicationManager) {
350       super("ReplicationStatisticsThread");
351       this.replicationManager = replicationManager;
352       this.replicationSink = replicationSink;
353     }
354
355     @Override
356     public void run() {
357       printStats(this.replicationManager.getStats());
358       printStats(this.replicationSink.getStats());
359     }
360
361     private void printStats(String stats) {
362       if (!stats.isEmpty()) {
363         LOG.info(stats);
364       }
365     }
366   }
367
368   @Override
369   public ReplicationLoad refreshAndGetReplicationLoad() {
370     if (this.replicationLoad == null) {
371       return null;
372     }
373     // always build for latest data
374     buildReplicationLoad();
375     return this.replicationLoad;
376   }
377
378   private void buildReplicationLoad() {
379     // get source
380     List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
381     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
382
383     for (ReplicationSourceInterface source : sources) {
384       if (source instanceof ReplicationSource) {
385         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
386       }
387     }
388     // get sink
389     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
390     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
391   }
392 }