View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
24  
25  import java.io.IOException;
26  import java.util.List;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellScanner;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.HTableDescriptor;
46  import org.apache.hadoop.hbase.Server;
47  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
48  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
49  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
50  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
51  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
52  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
53  import org.apache.hadoop.hbase.replication.ReplicationException;
54  import org.apache.hadoop.hbase.replication.ReplicationFactory;
55  import org.apache.hadoop.hbase.replication.ReplicationPeers;
56  import org.apache.hadoop.hbase.replication.ReplicationQueues;
57  import org.apache.hadoop.hbase.replication.ReplicationTracker;
58  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
59  import org.apache.hadoop.hbase.util.Bytes;
60  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
61  import org.apache.zookeeper.KeeperException;
62  
63  import com.google.common.util.concurrent.ThreadFactoryBuilder;
64  
65  /**
66   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
67   */
68  @InterfaceAudience.Private
69  public class Replication implements WALActionsListener, 
70    ReplicationSourceService, ReplicationSinkService {
71    private static final Log LOG =
72        LogFactory.getLog(Replication.class);
73    private boolean replication;
74    private ReplicationSourceManager replicationManager;
75    private ReplicationQueues replicationQueues;
76    private ReplicationPeers replicationPeers;
77    private ReplicationTracker replicationTracker;
78    private Configuration conf;
79    private ReplicationSink replicationSink;
80    // Hosting server
81    private Server server;
82    /** Statistics thread schedule pool */
83    private ScheduledExecutorService scheduleThreadPool;
84    private int statsThreadPeriod;
85  
86    /**
87     * Instantiate the replication management (if rep is enabled).
88     * @param server Hosting server
89     * @param fs handle to the filesystem
90     * @param logDir
91     * @param oldLogDir directory where logs are archived
92     * @throws IOException
93     */
94    public Replication(final Server server, final FileSystem fs,
95        final Path logDir, final Path oldLogDir) throws IOException{
96      initialize(server, fs, logDir, oldLogDir);
97    }
98  
99    /**
100    * Empty constructor
101    */
102   public Replication() {
103   }
104 
105   public void initialize(final Server server, final FileSystem fs,
106       final Path logDir, final Path oldLogDir) throws IOException {
107     this.server = server;
108     this.conf = this.server.getConfiguration();
109     this.replication = isReplication(this.conf);
110     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
111       new ThreadFactoryBuilder()
112         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
113         .setDaemon(true)
114         .build());
115     if (replication) {
116       try {
117         this.replicationQueues =
118             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
119         this.replicationQueues.init(this.server.getServerName().toString());
120         this.replicationPeers =
121             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
122         this.replicationPeers.init();
123         this.replicationTracker =
124             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
125               this.conf, this.server, this.server);
126       } catch (ReplicationException e) {
127         throw new IOException("Failed replication handler create", e);
128       }
129       UUID clusterId = null;
130       try {
131         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
132       } catch (KeeperException ke) {
133         throw new IOException("Could not read cluster id", ke);
134       }
135       this.replicationManager =
136           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
137               conf, this.server, fs, logDir, oldLogDir, clusterId);
138       this.statsThreadPeriod =
139           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
140       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
141     } else {
142       this.replicationManager = null;
143       this.replicationQueues = null;
144       this.replicationPeers = null;
145       this.replicationTracker = null;
146     }
147   }
148 
149    /**
150     * @param c Configuration to look at
151     * @return True if replication is enabled.
152     */
153   public static boolean isReplication(final Configuration c) {
154     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
155   }
156 
157    /*
158     * Returns an object to listen to new hlog changes
159     **/
160   public WALActionsListener getWALActionsListener() {
161     return this;
162   }
163   /**
164    * Stops replication service.
165    */
166   public void stopReplicationService() {
167     join();
168   }
169 
170   /**
171    * Join with the replication threads
172    */
173   public void join() {
174     if (this.replication) {
175       this.replicationManager.join();
176       if (this.replicationSink != null) {
177         this.replicationSink.stopReplicationSinkServices();
178       }
179     }
180     scheduleThreadPool.shutdown();
181   }
182 
183   /**
184    * Carry on the list of log entries down to the sink
185    * @param entries list of entries to replicate
186    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
187    * do not contain the Cells we are replicating; they are passed here on the side in this
188    * CellScanner).
189    * @throws IOException
190    */
191   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
192     if (this.replication) {
193       this.replicationSink.replicateEntries(entries, cells);
194     }
195   }
196 
197   /**
198    * If replication is enabled and this cluster is a master,
199    * it starts
200    * @throws IOException
201    */
202   public void startReplicationService() throws IOException {
203     if (this.replication) {
204       try {
205         this.replicationManager.init();
206       } catch (ReplicationException e) {
207         throw new IOException(e);
208       }
209       this.replicationSink = new ReplicationSink(this.conf, this.server);
210       this.scheduleThreadPool.scheduleAtFixedRate(
211         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
212         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
213     }
214   }
215 
216   /**
217    * Get the replication sources manager
218    * @return the manager if replication is enabled, else returns false
219    */
220   public ReplicationSourceManager getReplicationManager() {
221     return this.replicationManager;
222   }
223 
224   @Override
225   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
226       WALEdit logEdit) {
227     // Not interested
228   }
229 
230   @Override
231   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
232                                        WALEdit logEdit) {
233     scopeWALEdits(htd, logKey, logEdit);
234   }
235 
236   /**
237    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
238    * from compaction WAL edits and if the scope is local.
239    * @param htd Descriptor used to find the scope to use
240    * @param logKey Key that may get scoped according to its edits
241    * @param logEdit Edits used to lookup the scopes
242    */
243   public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey,
244                                    WALEdit logEdit) {
245     NavigableMap<byte[], Integer> scopes =
246         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
247     byte[] family;
248     for (Cell cell : logEdit.getCells()) {
249       family = cell.getFamily();
250       // This is expected and the KV should not be replicated
251       if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
252       // Unexpected, has a tendency to happen in unit tests
253       assert htd.getFamily(family) != null;
254 
255       int scope = htd.getFamily(family).getScope();
256       if (scope != REPLICATION_SCOPE_LOCAL &&
257           !scopes.containsKey(family)) {
258         scopes.put(family, scope);
259       }
260     }
261     if (!scopes.isEmpty()) {
262       logKey.setScopes(scopes);
263     }
264   }
265 
266   @Override
267   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
268     getReplicationManager().preLogRoll(newPath);
269   }
270 
271   @Override
272   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
273     getReplicationManager().postLogRoll(newPath);
274   }
275 
276   @Override
277   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
278     // Not interested
279   }
280 
281   @Override
282   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
283     // Not interested
284   }
285 
286   /**
287    * This method modifies the master's configuration in order to inject
288    * replication-related features
289    * @param conf
290    */
291   public static void decorateMasterConfiguration(Configuration conf) {
292     if (!isReplication(conf)) {
293       return;
294     }
295     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
296     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
297     if (!plugins.contains(cleanerClass)) {
298       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
299     }
300   }
301 
302   @Override
303   public void logRollRequested() {
304     // Not interested
305   }
306 
307   @Override
308   public void logCloseRequested() {
309     // not interested
310   }
311 
312   /*
313    * Statistics thread. Periodically prints the cache statistics to the log.
314    */
315   static class ReplicationStatisticsThread extends Thread {
316 
317     private final ReplicationSink replicationSink;
318     private final ReplicationSourceManager replicationManager;
319 
320     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
321                             final ReplicationSourceManager replicationManager) {
322       super("ReplicationStatisticsThread");
323       this.replicationManager = replicationManager;
324       this.replicationSink = replicationSink;
325     }
326 
327     @Override
328     public void run() {
329       printStats(this.replicationManager.getStats());
330       printStats(this.replicationSink.getStats());
331     }
332 
333     private void printStats(String stats) {
334       if (!stats.isEmpty()) {
335         LOG.info(stats);
336       }
337     }
338   }
339 }