View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS;
22  import static org.apache.hadoop.hbase.HConstants.REPLICATION_ENABLE_KEY;
23  import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
24  
25  import java.io.IOException;
26  import java.util.List;
27  import java.util.NavigableMap;
28  import java.util.TreeMap;
29  import java.util.UUID;
30  import java.util.concurrent.Executors;
31  import java.util.concurrent.ScheduledExecutorService;
32  import java.util.concurrent.TimeUnit;
33  
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.fs.FileSystem;
39  import org.apache.hadoop.fs.Path;
40  import org.apache.hadoop.hbase.Cell;
41  import org.apache.hadoop.hbase.CellScanner;
42  import org.apache.hadoop.hbase.CellUtil;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HTableDescriptor;
45  import org.apache.hadoop.hbase.Server;
46  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
47  import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
48  import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
49  import org.apache.hadoop.hbase.wal.WALKey;
50  import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
51  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
52  import org.apache.hadoop.hbase.replication.ReplicationException;
53  import org.apache.hadoop.hbase.replication.ReplicationFactory;
54  import org.apache.hadoop.hbase.replication.ReplicationPeers;
55  import org.apache.hadoop.hbase.replication.ReplicationQueues;
56  import org.apache.hadoop.hbase.replication.ReplicationTracker;
57  import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
60  import org.apache.zookeeper.KeeperException;
61  
62  import com.google.common.util.concurrent.ThreadFactoryBuilder;
63  
64  /**
65   * Gateway to Replication.  Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
66   */
67  @InterfaceAudience.Private
68  public class Replication extends WALActionsListener.Base implements 
69    ReplicationSourceService, ReplicationSinkService {
70    private static final Log LOG =
71        LogFactory.getLog(Replication.class);
72    private boolean replication;
73    private ReplicationSourceManager replicationManager;
74    private ReplicationQueues replicationQueues;
75    private ReplicationPeers replicationPeers;
76    private ReplicationTracker replicationTracker;
77    private Configuration conf;
78    private ReplicationSink replicationSink;
79    // Hosting server
80    private Server server;
81    /** Statistics thread schedule pool */
82    private ScheduledExecutorService scheduleThreadPool;
83    private int statsThreadPeriod;
84  
85    /**
86     * Instantiate the replication management (if rep is enabled).
87     * @param server Hosting server
88     * @param fs handle to the filesystem
89     * @param logDir
90     * @param oldLogDir directory where logs are archived
91     * @throws IOException
92     */
93    public Replication(final Server server, final FileSystem fs,
94        final Path logDir, final Path oldLogDir) throws IOException{
95      initialize(server, fs, logDir, oldLogDir);
96    }
97  
98    /**
99     * Empty constructor
100    */
101   public Replication() {
102   }
103 
104   public void initialize(final Server server, final FileSystem fs,
105       final Path logDir, final Path oldLogDir) throws IOException {
106     this.server = server;
107     this.conf = this.server.getConfiguration();
108     this.replication = isReplication(this.conf);
109     this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
110       new ThreadFactoryBuilder()
111         .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
112         .setDaemon(true)
113         .build());
114     if (replication) {
115       try {
116         this.replicationQueues =
117             ReplicationFactory.getReplicationQueues(server.getZooKeeper(), this.conf, this.server);
118         this.replicationQueues.init(this.server.getServerName().toString());
119         this.replicationPeers =
120             ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf, this.server);
121         this.replicationPeers.init();
122         this.replicationTracker =
123             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
124               this.conf, this.server, this.server);
125       } catch (ReplicationException e) {
126         throw new IOException("Failed replication handler create", e);
127       }
128       UUID clusterId = null;
129       try {
130         clusterId = ZKClusterId.getUUIDForCluster(this.server.getZooKeeper());
131       } catch (KeeperException ke) {
132         throw new IOException("Could not read cluster id", ke);
133       }
134       this.replicationManager =
135           new ReplicationSourceManager(replicationQueues, replicationPeers, replicationTracker,
136               conf, this.server, fs, logDir, oldLogDir, clusterId);
137       this.statsThreadPeriod =
138           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
139       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);
140     } else {
141       this.replicationManager = null;
142       this.replicationQueues = null;
143       this.replicationPeers = null;
144       this.replicationTracker = null;
145     }
146   }
147 
148    /**
149     * @param c Configuration to look at
150     * @return True if replication is enabled.
151     */
152   public static boolean isReplication(final Configuration c) {
153     return c.getBoolean(REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
154   }
155 
156    /*
157     * Returns an object to listen to new wal changes
158     **/
159   public WALActionsListener getWALActionsListener() {
160     return this;
161   }
162   /**
163    * Stops replication service.
164    */
165   public void stopReplicationService() {
166     join();
167   }
168 
169   /**
170    * Join with the replication threads
171    */
172   public void join() {
173     if (this.replication) {
174       this.replicationManager.join();
175       if (this.replicationSink != null) {
176         this.replicationSink.stopReplicationSinkServices();
177       }
178     }
179     scheduleThreadPool.shutdown();
180   }
181 
182   /**
183    * Carry on the list of log entries down to the sink
184    * @param entries list of entries to replicate
185    * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
186    * do not contain the Cells we are replicating; they are passed here on the side in this
187    * CellScanner).
188    * @throws IOException
189    */
190   public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
191     if (this.replication) {
192       this.replicationSink.replicateEntries(entries, cells);
193     }
194   }
195 
196   /**
197    * If replication is enabled and this cluster is a master,
198    * it starts
199    * @throws IOException
200    */
201   public void startReplicationService() throws IOException {
202     if (this.replication) {
203       try {
204         this.replicationManager.init();
205       } catch (ReplicationException e) {
206         throw new IOException(e);
207       }
208       this.replicationSink = new ReplicationSink(this.conf, this.server);
209       this.scheduleThreadPool.scheduleAtFixedRate(
210         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
211         statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
212     }
213   }
214 
215   /**
216    * Get the replication sources manager
217    * @return the manager if replication is enabled, else returns false
218    */
219   public ReplicationSourceManager getReplicationManager() {
220     return this.replicationManager;
221   }
222 
223   @Override
224   public void visitLogEntryBeforeWrite(HTableDescriptor htd, WALKey logKey,
225                                        WALEdit logEdit) {
226     scopeWALEdits(htd, logKey, logEdit);
227   }
228 
229   /**
230    * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys
231    * from compaction WAL edits and if the scope is local.
232    * @param htd Descriptor used to find the scope to use
233    * @param logKey Key that may get scoped according to its edits
234    * @param logEdit Edits used to lookup the scopes
235    */
236   public static void scopeWALEdits(HTableDescriptor htd, WALKey logKey,
237                                    WALEdit logEdit) {
238     NavigableMap<byte[], Integer> scopes =
239         new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
240     byte[] family;
241     for (Cell cell : logEdit.getCells()) {
242       family = cell.getFamily();
243       // This is expected and the KV should not be replicated
244       if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) continue;
245       // Unexpected, has a tendency to happen in unit tests
246       assert htd.getFamily(family) != null;
247 
248       int scope = htd.getFamily(family).getScope();
249       if (scope != REPLICATION_SCOPE_LOCAL &&
250           !scopes.containsKey(family)) {
251         scopes.put(family, scope);
252       }
253     }
254     if (!scopes.isEmpty()) {
255       logKey.setScopes(scopes);
256     }
257   }
258 
259   @Override
260   public void preLogRoll(Path oldPath, Path newPath) throws IOException {
261     getReplicationManager().preLogRoll(newPath);
262   }
263 
264   @Override
265   public void postLogRoll(Path oldPath, Path newPath) throws IOException {
266     getReplicationManager().postLogRoll(newPath);
267   }
268 
269   /**
270    * This method modifies the master's configuration in order to inject
271    * replication-related features
272    * @param conf
273    */
274   public static void decorateMasterConfiguration(Configuration conf) {
275     if (!isReplication(conf)) {
276       return;
277     }
278     String plugins = conf.get(HBASE_MASTER_LOGCLEANER_PLUGINS);
279     String cleanerClass = ReplicationLogCleaner.class.getCanonicalName();
280     if (!plugins.contains(cleanerClass)) {
281       conf.set(HBASE_MASTER_LOGCLEANER_PLUGINS, plugins + "," + cleanerClass);
282     }
283   }
284 
285   /*
286    * Statistics thread. Periodically prints the cache statistics to the log.
287    */
288   static class ReplicationStatisticsThread extends Thread {
289 
290     private final ReplicationSink replicationSink;
291     private final ReplicationSourceManager replicationManager;
292 
293     public ReplicationStatisticsThread(final ReplicationSink replicationSink,
294                             final ReplicationSourceManager replicationManager) {
295       super("ReplicationStatisticsThread");
296       this.replicationManager = replicationManager;
297       this.replicationSink = replicationSink;
298     }
299 
300     @Override
301     public void run() {
302       printStats(this.replicationManager.getStats());
303       printStats(this.replicationSink.getStats());
304     }
305 
306     private void printStats(String stats) {
307       if (!stats.isEmpty()) {
308         LOG.info(stats);
309       }
310     }
311   }
312 }