View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.util.List;
23  import java.util.NavigableMap;
24  import java.util.TreeMap;
25  import java.util.UUID;
26  import java.util.concurrent.Executors;
27  import java.util.concurrent.ScheduledExecutorService;
28  import java.util.concurrent.TimeUnit;
29  
30  import com.google.common.util.concurrent.ThreadFactoryBuilder;
31  import org.apache.commons.logging.Log;
32  import org.apache.commons.logging.LogFactory;
33  import org.apache.hadoop.classification.InterfaceAudience;
34  import org.apache.hadoop.conf.Configuration;
35  import org.apache.hadoop.fs.FileSystem;
36  import org.apache.hadoop.fs.Path;
37  import org.apache.hadoop.hbase.CellScanner;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.HTableDescriptor;
41  import org.apache.hadoop.hbase.KeyValue;
42  import org.apache.hadoop.hbase.Server;
43  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
44  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
45  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
46  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
47  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
48  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
49  import org.apache.hadoop.hbase.replication.ReplicationException;
50  import org.apache.hadoop.hbase.replication.ReplicationFactory;
51  import org.apache.hadoop.hbase.replication.ReplicationPeers;
52  import org.apache.hadoop.hbase.replication.ReplicationQueues;
53  import org.apache.hadoop.hbase.replication.ReplicationTracker;
54  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
55  import org.apache.hadoop.hbase.util.Bytes;
56  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
57  import org.apache.zookeeper.KeeperException;
58  
59  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
60  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
61  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
62  
63  /**
64   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
65   */
66  @InterfaceAudience.Private
67  public class Replication implements WALActionsListener, 
68    ReplicationSourceService, ReplicationSinkService {
69    private static final Log LOG =
70        LogFactory.getLog(Replication.class);
71    private boolean replication;
72    private ReplicationSourceManager replicationManager;
73    private ReplicationQueues replicationQueues;
74    private ReplicationPeers replicationPeers;
75    private ReplicationTracker replicationTracker;
76    private Configuration conf;
77    private ReplicationSink replicationSink;
78    // Hosting server
79    private Server server;
80    /** Statistics thread schedule pool */
81    private ScheduledExecutorService scheduleThreadPool;
82    private int statsThreadPeriod;
83  
84    /**
85     * Instantiate the replication management (if rep is enabled).
86     * @param server Hosting server
87     * @param fs handle to the filesystem
88     * @param logDir
89     * @param oldLogDir directory where logs are archived
90     * @throws IOException
91     */
92    public Replication(final Server server, final FileSystem fs,
93        final Path logDir, final Path oldLogDir) throws IOException{
94      initialize(server, fs, logDir, oldLogDir);
95    }
96  
97    /**
98     * Empty constructor
99     */
100   public Replication() {
101   }
102 
103   public void initialize(final Server server, final FileSystem fs,
104       final Path logDir, final Path oldLogDir) throws IOException {
105     this.server = server;
106     this.conf = this.server.getConfiguration();
107     this.replication = isReplication(this.conf);
108     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
109       new ThreadFactoryBuilder()
110         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
111         .setDaemon(true)
112         .build());
113     if (replication) {
114       try {
115         this.replicationQueues =
116             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
117         this.replicationQueues.init(this.server.getServerName().toString());
118         this.replicationPeers =
119             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
120         this.replicationPeers.init();
121         this.replicationTracker =
122             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
123               this.conf, this.server, this.server);
124       } catch (ReplicationException e) {
125         throw new IOException("Failed replication handler create", e);
126       }
127       UUID clusterId = null;
128       try {
129         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
130       } catch (KeeperException ke) {
131         throw new IOException("Could not read cluster id", ke);
132       }
133       this.replicationManager =
134           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
135               conf, this.server, fs, logDir, oldLogDir, clusterId);
136       this.statsThreadPeriod =
137           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
138       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
139     } else {
140       this.replicationManager = null;
141       this.replicationQueues = null;
142       this.replicationPeers = null;
143       this.replicationTracker = null;
144     }
145   }
146 
147    /**
148     * @param c Configuration to look at
149     * @return True if replication is enabled.
150     */
151   public static boolean isReplication(final Configuration c) {
152     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
153   }
154 
155    /*
156     * Returns an object to listen to new hlog changes
157     **/
158   public WALActionsListener getWALActionsListener() {
159     return this;
160   }
161   /**
162    * Stops replication service.
163    */
164   public void stopReplicationService() {
165     join();
166   }
167 
168   /**
169    * Join with the replication threads
170    */
171   public void join() {
172     if (this.replication) {
173       this.replicationManager.join();
174       if (this.replicationSink != null) {
175         this.replicationSink.stopReplicationSinkServices();
176       }
177     }
178     scheduleThreadPool.shutdown();
179   }
180 
181   /**
182    * Carry on the list of log entries down to the sink
183    * @param entries list of entries to replicate
184    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
185    * do not contain the Cells we are replicating; they are passed here on the side in this
186    * CellScanner).
187    * @throws IOException
188    */
189   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
190     if (this.replication) {
191       this.replicationSink.replicateEntries(entries, cells);
192     }
193   }
194 
195   /**
196    * If replication is enabled and this cluster is a master,
197    * it starts
198    * @throws IOException
199    */
200   public void startReplicationService() throws IOException {
201     if (this.replication) {
202       try {
203         this.replicationManager.init();
204       } catch (ReplicationException e) {
205         throw new IOException(e);
206       }
207       this.replicationSink = new ReplicationSink(this.conf, this.server);
208       this.scheduleThreadPool.scheduleAtFixedRate(
209         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
210         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
211     }
212   }
213 
214   /**
215    * Get the replication sources manager
216    * @return the manager if replication is enabled, else returns false
217    */
218   public ReplicationSourceManager getReplicationManager() {
219     return this.replicationManager;
220   }
221 
222   @Override
223   public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
224       WALEdit logEdit) {
225     // Not interested
226   }
227 
228   @Override
229   public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
230                                        WALEdit logEdit) {
231     scopeWALEdits(htd, logKey, logEdit);
232   }
233 
234   /**
235    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
236    * from compaction WAL edits and if the scope is local.
237    * @param htd Descriptor used to find the scope to use
238    * @param logKey Key that may get scoped according to its edits
239    * @param logEdit Edits used to lookup the scopes
240    */
241   public static void scopeWALEdits(HTableDescriptor htd, HLogKey logKey,
242                                    WALEdit logEdit) {
243     NavigableMap<byte[], Integer> scopes =
244         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
245     byte[] family;
246     for (KeyValue kv : logEdit.getKeyValues()) {
247       family = kv.getFamily();
248       // This is expected and the KV should not be replicated
249       if (kv.matchingFamily(WALEdit.METAFAMILY)) continue;
250       // Unexpected, has a tendency to happen in unit tests
251       assert htd.getFamily(family) != null;
252 
253       int scope = htd.getFamily(family).getScope();
254       if (scope != REPLICATION_SCOPE_LOCAL &&
255           !scopes.containsKey(family)) {
256         scopes.put(family, scope);
257       }
258     }
259     if (!scopes.isEmpty()) {
260       logKey.setScopes(scopes);
261     }
262   }
263 
264   @Override
265   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
266     getReplicationManager().preLogRoll(newPath);
267   }
268 
269   @Override
270   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
271     getReplicationManager().postLogRoll(newPath);
272   }
273 
274   @Override
275   public void preLogArchive(Path oldPath, Path newPath) throws IOException {
276     // Not interested
277   }
278 
279   @Override
280   public void postLogArchive(Path oldPath, Path newPath) throws IOException {
281     // Not interested
282   }
283 
284   /**
285    * This method modifies the master's configuration in order to inject
286    * replication-related features
287    * @param conf
288    */
289   public static void decorateMasterConfiguration(Configuration conf) {
290     if (!isReplication(conf)) {
291       return;
292     }
293     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
294     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
295     if (!plugins.contains(cleanerClass)) {
296       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
297     }
298   }
299 
300   @Override
301   public void logRollRequested() {
302     // Not interested
303   }
304 
305   @Override
306   public void logCloseRequested() {
307     // not interested
308   }
309 
310   /*
311    * Statistics thread. Periodically prints the cache statistics to the log.
312    */
313   static class ReplicationStatisticsThread extends Thread {
314 
315     private final ReplicationSink replicationSink;
316     private final ReplicationSourceManager replicationManager;
317 
318     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
319                             final ReplicationSourceManager replicationManager) {
320       super("ReplicationStatisticsThread");
321       this.replicationManager = replicationManager;
322       this.replicationSink = replicationSink;
323     }
324 
325     @Override
326     public void run() {
327       printStats(this.replicationManager.getStats());
328       printStats(this.replicationSink.getStats());
329     }
330 
331     private void printStats(String stats) {
332       if (!stats.isEmpty()) {
333         LOG.info(stats);
334       }
335     }
336   }
337 }