View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
24  
25  import java.io.IOException;
26  import java.util.ArrayList;
27  import java.util.List;
28  import java.util.NavigableMap;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  import java.util.concurrent.Executors;
32  import java.util.concurrent.ScheduledExecutorService;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.hbase.classification.InterfaceAudience;
38  import org.apache.hadoop.conf.Configuration;
39  import org.apache.hadoop.fs.FileSystem;
40  import org.apache.hadoop.fs.Path;
41  import org.apache.hadoop.hbase.Cell;
42  import org.apache.hadoop.hbase.CellScanner;
43  import org.apache.hadoop.hbase.CellUtil;
44  import org.apache.hadoop.hbase.HConstants;
45  import org.apache.hadoop.hbase.HRegionInfo;
46  import org.apache.hadoop.hbase.HTableDescriptor;
47  import org.apache.hadoop.hbase.Server;
48  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
49  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
50  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
51  import org.apache.hadoop.hbase.wal.WALKey;
52  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
53  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
54  import org.apache.hadoop.hbase.replication.ReplicationException;
55  import org.apache.hadoop.hbase.replication.ReplicationFactory;
56  import org.apache.hadoop.hbase.replication.ReplicationPeers;
57  import org.apache.hadoop.hbase.replication.ReplicationQueues;
58  import org.apache.hadoop.hbase.replication.ReplicationTracker;
59  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
60  import org.apache.hadoop.hbase.util.Bytes;
61  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
62  import org.apache.zookeeper.KeeperException;
63  
64  import com.google.common.util.concurrent.ThreadFactoryBuilder;
65  
66  /**
67   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
68   */
69  @InterfaceAudience.Private
70  public class Replication extends WALActionsListener.Base implements
71    ReplicationSourceService, ReplicationSinkService {
72    private static final Log LOG =
73        LogFactory.getLog(Replication.class);
74    private boolean replication;
75    private ReplicationSourceManager replicationManager;
76    private ReplicationQueues replicationQueues;
77    private ReplicationPeers replicationPeers;
78    private ReplicationTracker replicationTracker;
79    private Configuration conf;
80    private ReplicationSink replicationSink;
81    // Hosting server
82    private Server server;
83    /** Statistics thread schedule pool */
84    private ScheduledExecutorService scheduleThreadPool;
85    private int statsThreadPeriod;
86    // ReplicationLoad to access replication metrics
87    private ReplicationLoad replicationLoad;
88  
89    /**
90     * Instantiate the replication management (if rep is enabled).
91     * @param server Hosting server
92     * @param fs handle to the filesystem
93     * @param logDir
94     * @param oldLogDir directory where logs are archived
95     * @throws IOException
96     */
97    public Replication(final Server server, final FileSystem fs,
98        final Path logDir, final Path oldLogDir) throws IOException{
99      initialize(server, fs, logDir, oldLogDir);
100   }
101 
102   /**
103    * Empty constructor
104    */
105   public Replication() {
106   }
107 
108   public void initialize(final Server server, final FileSystem fs,
109       final Path logDir, final Path oldLogDir) throws IOException {
110     this.server = server;
111     this.conf = this.server.getConfiguration();
112     this.replication = isReplication(this.conf);
113     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
114       new ThreadFactoryBuilder()
115         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
116         .setDaemon(true)
117         .build());
118     if (replication) {
119       try {
120         this.replicationQueues =
121             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
122         this.replicationQueues.init(this.server.getServerName().toString());
123         this.replicationPeers =
124             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
125         this.replicationPeers.init();
126         this.replicationTracker =
127             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
128               this.conf, this.server, this.server);
129       } catch (ReplicationException e) {
130         throw new IOException("Failed replication handler create", e);
131       }
132       UUID clusterId = null;
133       try {
134         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
135       } catch (KeeperException ke) {
136         throw new IOException("Could not read cluster id", ke);
137       }
138       this.replicationManager =
139           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
140               conf, this.server, fs, logDir, oldLogDir, clusterId);
141       this.statsThreadPeriod =
142           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
143       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
144       this.replicationLoad = new ReplicationLoad();
145     } else {
146       this.replicationManager = null;
147       this.replicationQueues = null;
148       this.replicationPeers = null;
149       this.replicationTracker = null;
150       this.replicationLoad = null;
151     }
152   }
153 
154    /**
155     * @param c Configuration to look at
156     * @return True if replication is enabled.
157     */
158   public static boolean isReplication(final Configuration c) {
159     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
160   }
161 
162    /*
163     * Returns an object to listen to new wal changes
164     **/
165   public WALActionsListener getWALActionsListener() {
166     return this;
167   }
168   /**
169    * Stops replication service.
170    */
171   public void stopReplicationService() {
172     join();
173   }
174 
175   /**
176    * Join with the replication threads
177    */
178   public void join() {
179     if (this.replication) {
180       this.replicationManager.join();
181       if (this.replicationSink != null) {
182         this.replicationSink.stopReplicationSinkServices();
183       }
184     }
185     scheduleThreadPool.shutdown();
186   }
187 
188   /**
189    * Carry on the list of log entries down to the sink
190    * @param entries list of entries to replicate
191    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
192    * do not contain the Cells we are replicating; they are passed here on the side in this
193    * CellScanner).
194    * @throws IOException
195    */
196   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
197     if (this.replication) {
198       this.replicationSink.replicateEntries(entries, cells);
199     }
200   }
201 
202   /**
203    * If replication is enabled and this cluster is a master,
204    * it starts
205    * @throws IOException
206    */
207   public void startReplicationService() throws IOException {
208     if (this.replication) {
209       try {
210         this.replicationManager.init();
211       } catch (ReplicationException e) {
212         throw new IOException(e);
213       }
214       this.replicationSink = new ReplicationSink(this.conf, this.server);
215       this.scheduleThreadPool.scheduleAtFixedRate(
216         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
217         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
218     }
219   }
220 
221   /**
222    * Get the replication sources manager
223    * @return the manager if replication is enabled, else returns false
224    */
225   public ReplicationSourceManager getReplicationManager() {
226     return this.replicationManager;
227   }
228 
229   @Override
230   public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
231                                        WALEdit logEdit) {
232     scopeWALEdits(htd, logKey, logEdit);
233   }
234 
235   /**
236    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
237    * from compaction WAL edits and if the scope is local.
238    * @param htd Descriptor used to find the scope to use
239    * @param logKey Key that may get scoped according to its edits
240    * @param logEdit Edits used to lookup the scopes
241    */
242   public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
243                                    WALEdit logEdit) {
244     NavigableMap<byte[], Integer> scopes =
245         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
246     byte[] family;
247     for (Cell cell : logEdit.getCells()) {
248       family = cell.getFamily();
249       // This is expected and the KV should not be replicated
250       if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
251       // Unexpected, has a tendency to happen in unit tests
252       assert htd.getFamily(family) != null;
253 
254       int scope = htd.getFamily(family).getScope();
255       if (scope != REPLICATION_SCOPE_LOCAL &&
256           !scopes.containsKey(family)) {
257         scopes.put(family, scope);
258       }
259     }
260     if (!scopes.isEmpty()) {
261       logKey.setScopes(scopes);
262     }
263   }
264 
265   @Override
266   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
267     getReplicationManager().preLogRoll(newPath);
268   }
269 
270   @Override
271   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
272     getReplicationManager().postLogRoll(newPath);
273   }
274 
275   /**
276    * This method modifies the master's configuration in order to inject
277    * replication-related features
278    * @param conf
279    */
280   public static void decorateMasterConfiguration(Configuration conf) {
281     if (!isReplication(conf)) {
282       return;
283     }
284     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
285     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
286     if (!plugins.contains(cleanerClass)) {
287       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
288     }
289   }
290 
291   /*
292    * Statistics thread. Periodically prints the cache statistics to the log.
293    */
294   static class ReplicationStatisticsThread extends Thread {
295 
296     private final ReplicationSink replicationSink;
297     private final ReplicationSourceManager replicationManager;
298 
299     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
300                             final ReplicationSourceManager replicationManager) {
301       super("ReplicationStatisticsThread");
302       this.replicationManager = replicationManager;
303       this.replicationSink = replicationSink;
304     }
305 
306     @Override
307     public void run() {
308       printStats(this.replicationManager.getStats());
309       printStats(this.replicationSink.getStats());
310     }
311 
312     private void printStats(String stats) {
313       if (!stats.isEmpty()) {
314         LOG.info(stats);
315       }
316     }
317   }
318 
319   @Override
320   public ReplicationLoad refreshAndGetReplicationLoad() {
321     if (this.replicationLoad == null) {
322       return null;
323     }
324     // always build for latest data
325     buildReplicationLoad();
326     return this.replicationLoad;
327   }
328 
329   private void buildReplicationLoad() {
330     // get source
331     List<ReplicationSourceInterface> sources = this.replicationManager.getSources();
332     List<MetricsSource> sourceMetricsList = new ArrayList<MetricsSource>();
333 
334     for (ReplicationSourceInterface source : sources) {
335       if (source instanceof ReplicationSource) {
336         sourceMetricsList.add(((ReplicationSource) source).getSourceMetrics());
337       }
338     }
339     // get sink
340     MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
341     this.replicationLoad.buildReplicationLoad(sourceMetricsList, sinkMetrics);
342   }
343 }