View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.NavigableMap;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.Server;
47  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
48  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
49  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
50  import org.apache.hadoop.hbase.wal.WALKey;
51  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
52  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53  import org.apache.hadoop.hbase.replication.ReplicationException;
54  import org.apache.hadoop.hbase.replication.ReplicationFactory;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueues;
57  import org.apache.hadoop.hbase.replication.ReplicationTracker;
58  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
61  import org.apache.zookeeper.KeeperException;
62  
63  import com.google.common.util.concurrent.ThreadFactoryBuilder;
64  
65  /**
66   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
67   */
68  @InterfaceAudience.Private
69  public class Replication extends WALActionsListener.Base implements
70    ReplicationSourceService, ReplicationSinkService {
71    private static final Log LOG =
72        LogFactory.getLog(Replication.class);
73    private boolean replication;
74    private ReplicationSourceManager replicationManager;
75    private ReplicationQueues replicationQueues;
76    private ReplicationPeers replicationPeers;
77    private ReplicationTracker replicationTracker;
78    private Configuration conf;
79    private ReplicationSink replicationSink;
80    // Hosting server
81    private Server server;
82    /** Statistics thread schedule pool */
83    private ScheduledExecutorService scheduleThreadPool;
84    private int statsThreadPeriod;
85    // ReplicationLoad to access replication metrics
86    private ReplicationLoad replicationLoad;
87  
88    /**
89     * Instantiate the replication management (if rep is enabled).
90     * @param server Hosting server
91     * @param fs handle to the filesystem
92     * @param logDir
93     * @param oldLogDir directory where logs are archived
94     * @throws IOException
95     */
96    public Replication(final Server server, final FileSystem fs,
97        final Path logDir, final Path oldLogDir) throws IOException{
98      initialize(server, fs, logDir, oldLogDir);
99    }
100 
101   /**
102    * Empty constructor
103    */
104   public Replication() {
105   }
106 
107   public void initialize(final Server server, final FileSystem fs,
108       final Path logDir, final Path oldLogDir) throws IOException {
109     this.server = server;
110     this.conf = this.server.getConfiguration();
111     this.replication = isReplication(this.conf);
112     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
113       new ThreadFactoryBuilder()
114         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
115         .setDaemon(true)
116         .build());
117     if (replication) {
118       try {
119         this.replicationQueues =
120             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
121         this.replicationQueues.init(this.server.getServerName().toString());
122         this.replicationPeers =
123             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
124         this.replicationPeers.init();
125         this.replicationTracker =
126             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
127               this.conf, this.server, this.server);
128       } catch (ReplicationException e) {
129         throw new IOException("Failed replication handler create", e);
130       }
131       UUID clusterId = null;
132       try {
133         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
134       } catch (KeeperException ke) {
135         throw new IOException("Could not read cluster id", ke);
136       }
137       this.replicationManager =
138           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
139               conf, this.server, fs, logDir, oldLogDir, clusterId);
140       this.statsThreadPeriod =
141           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
142       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
143       this.replicationLoad = new ReplicationLoad();
144     } else {
145       this.replicationManager = null;
146       this.replicationQueues = null;
147       this.replicationPeers = null;
148       this.replicationTracker = null;
149       this.replicationLoad = null;
150     }
151   }
152 
153    /**
154     * @param c Configuration to look at
155     * @return True if replication is enabled.
156     */
157   public static boolean isReplication(final Configuration c) {
158     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
159   }
160 
161    /*
162     * Returns an object to listen to new wal changes
163     **/
164   public WALActionsListener getWALActionsListener() {
165     return this;
166   }
167   /**
168    * Stops replication service.
169    */
170   public void stopReplicationService() {
171     join();
172   }
173 
174   /**
175    * Join with the replication threads
176    */
177   public void join() {
178     if (this.replication) {
179       this.replicationManager.join();
180       if (this.replicationSink != null) {
181         this.replicationSink.stopReplicationSinkServices();
182       }
183     }
184     scheduleThreadPool.shutdown();
185   }
186 
187   /**
188    * Carry on the list of log entries down to the sink
189    * @param entries list of entries to replicate
190    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
191    * do not contain the Cells we are replicating; they are passed here on the side in this
192    * CellScanner).
193    * @throws IOException
194    */
195   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
196     if (this.replication) {
197       this.replicationSink.replicateEntries(entries, cells);
198     }
199   }
200 
201   /**
202    * If replication is enabled and this cluster is a master,
203    * it starts
204    * @throws IOException
205    */
206   public void startReplicationService() throws IOException {
207     if (this.replication) {
208       try {
209         this.replicationManager.init();
210       } catch (ReplicationException e) {
211         throw new IOException(e);
212       }
213       this.replicationSink = new ReplicationSink(this.conf, this.server);
214       this.scheduleThreadPool.scheduleAtFixedRate(
215         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
216         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
217     }
218   }
219 
220   /**
221    * Get the replication sources manager
222    * @return the manager if replication is enabled, else returns false
223    */
224   public ReplicationSourceManager getReplicationManager() {
225     return this.replicationManager;
226   }
227 
228   @Override
229   public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
230                                        WALEdit logEdit) {
231     scopeWALEdits(htd, logKey, logEdit);
232   }
233 
234   /**
235    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
236    * from compaction WAL edits and if the scope is local.
237    * @param htd Descriptor used to find the scope to use
238    * @param logKey Key that may get scoped according to its edits
239    * @param logEdit Edits used to lookup the scopes
240    */
241   public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
242                                    WALEdit logEdit) {
243     NavigableMap<byte[], Integer> scopes =
244         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
245     byte[] family;
246     for (Cell cell : logEdit.getCells()) {
247       family = CellUtil.cloneFamily(cell);
248       // This is expected and the KV should not be replicated
249       if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
250       // Unexpected, has a tendency to happen in unit tests
251       assert htd.getFamily(family) != null;
252 
253       int scope = htd.getFamily(family).getScope();
254       if (scope != REPLICATION_SCOPE_LOCAL &&
255           !scopes.containsKey(family)) {
256         scopes.put(family, scope);
257       }
258     }
259     if (!scopes.isEmpty()) {
260       logKey.setScopes(scopes);
261     }
262   }
263 
264   @Override
265   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
266     getReplicationManager().preLogRoll(newPath);
267   }
268 
269   @Override
270   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
271     getReplicationManager().postLogRoll(newPath);
272   }
273 
274   /**
275    * This method modifies the master's configuration in order to inject
276    * replication-related features
277    * @param conf
278    */
279   public static void decorateMasterConfiguration(Configuration conf) {
280     if (!isReplication(conf)) {
281       return;
282     }
283     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
284     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
285     if (!plugins.contains(cleanerClass)) {
286       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
287     }
288   }
289 
290   /*
291    * Statistics thread. Periodically prints the cache statistics to the log.
292    */
293   static class ReplicationStatisticsThread extends Thread {
294 
295     private final ReplicationSink replicationSink;
296     private final ReplicationSourceManager replicationManager;
297 
298     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
299                             final ReplicationSourceManager replicationManager) {
300       super("ReplicationStatisticsThread");
301       this.replicationManager = replicationManager;
302       this.replicationSink = replicationSink;
303     }
304 
305     @Override
306     public void run() {
307       printStats(this.replicationManager.getStats());
308       printStats(this.replicationSink.getStats());
309     }
310 
311     private void printStats(String stats) {
312       if (!stats.isEmpty()) {
313         LOG.info(stats);
314       }
315     }
316   }
317 
318   @Override
319   public ReplicationLoad refreshAndGetReplicationLoad() {
320     if (this.replicationLoad == null) {
321       return null;
322     }
323     // always build for latest data
324     buildReplicationLoad();
325     return this.replicationLoad;
326   }
327 
328   private void buildReplicationLoad() {
329     // get source
330     List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
331     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
332 
333     for (ReplicationSourceInterface source : sources) {
334       if (source instanceof ReplicationSource) {
335         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
336       }
337     }
338     // get sink
339     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
340     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
341   }
342 }