View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.lang.StringUtils;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.conf.Configuration;
37  import org.apache.hadoop.fs.Path;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellScanner;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.HBaseConfiguration;
42  import org.apache.hadoop.hbase.HConstants;
43  import org.apache.hadoop.hbase.Stoppable;
44  import org.apache.hadoop.hbase.TableName;
45  import org.apache.hadoop.hbase.classification.InterfaceAudience;
46  import org.apache.hadoop.hbase.client.Connection;
47  import org.apache.hadoop.hbase.client.ConnectionFactory;
48  import org.apache.hadoop.hbase.client.Delete;
49  import org.apache.hadoop.hbase.client.Mutation;
50  import org.apache.hadoop.hbase.client.Put;
51  import org.apache.hadoop.hbase.client.Row;
52  import org.apache.hadoop.hbase.client.Table;
53  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
54  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
55  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
56  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.StoreDescriptor;
57  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
58  import org.apache.hadoop.hbase.util.Bytes;
59  import org.apache.hadoop.hbase.util.Pair;
60  
61  /**
62   * <p>
63   * This class is responsible for replicating the edits coming
64   * from another cluster.
65   * </p><p>
66   * This replication process is currently waiting for the edits to be applied
67   * before the method can return. This means that the replication of edits
68   * is synchronized (after reading from WALs in ReplicationSource) and that a
69   * single region server cannot receive edits from two sources at the same time
70   * </p><p>
71   * This class uses the native HBase client in order to replicate entries.
72   * </p>
73   *
74   * TODO make this class more like ReplicationSource wrt log handling
75   */
76  @InterfaceAudience.Private
77  public class ReplicationSink {
78  
79    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
80    private final Configuration conf;
81    // Volatile because of note in here -- look for double-checked locking:
82    // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
83    private volatile Connection sharedHtableCon;
84    private final MetricsSink metrics;
85    private final AtomicLong totalReplicatedEdits = new AtomicLong();
86    private final Object sharedHtableConLock = new Object();
87    // Number of hfiles that we successfully replicated
88    private long hfilesReplicated = 0;
89    private SourceFSConfigurationProvider provider;
90  
91    /**
92     * Create a sink for replication
93     *
94     * @param conf                conf object
95     * @param stopper             boolean to tell this thread to stop
96     * @throws IOException thrown when HDFS goes bad or bad file name
97     */
98    public ReplicationSink(Configuration conf, Stoppable stopper)
99        throws IOException {
100     this.conf = HBaseConfiguration.create(conf);
101     decorateConf();
102     this.metrics = new MetricsSink();
103 
104     String className =
105         conf.get("hbase.replication.source.fs.conf.provider",
106           DefaultSourceFSConfigurationProvider.class.getCanonicalName());
107     try {
108       @SuppressWarnings("rawtypes")
109       Class c = Class.forName(className);
110       this.provider = (SourceFSConfigurationProvider) c.newInstance();
111     } catch (Exception e) {
112       throw new IllegalArgumentException("Configured source fs configuration provider class "
113           + className + " throws error.", e);
114     }
115   }
116 
117   /**
118    * decorate the Configuration object to make replication more receptive to delays:
119    * lessen the timeout and numTries.
120    */
121   private void decorateConf() {
122     this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
123         this.conf.getInt("replication.sink.client.retries.number", 4));
124     this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
125         this.conf.getInt("replication.sink.client.ops.timeout", 10000));
126     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
127     if (StringUtils.isNotEmpty(replicationCodec)) {
128       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
129     }
130    }
131 
132   /**
133    * Replicate this array of entries directly into the local cluster using the native client. Only
134    * operates against raw protobuf type saving on a conversion from pb to pojo.
135    * @param entries
136    * @param cells
137    * @param replicationClusterId Id which will uniquely identify source cluster FS client
138    *          configurations in the replication configuration directory
139    * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
140    *          directory
141    * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
142    * @throws IOException If failed to replicate the data
143    */
144   public void replicateEntries(List<WALEntry> entries, final CellScanner cells,
145       String replicationClusterId, String sourceBaseNamespaceDirPath,
146       String sourceHFileArchiveDirPath) throws IOException {
147     if (entries.isEmpty()) return;
148     if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
149     // Very simple optimization where we batch sequences of rows going
150     // to the same table.
151     try {
152       long totalReplicated = 0;
153       // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
154       // invocation of this method per table and cluster id.
155       Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
156           new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
157 
158       // Map of table name Vs list of pair of family and list of hfile paths from its namespace
159       Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap = null;
160 
161       for (WALEntry entry : entries) {
162         TableName table =
163             TableName.valueOf(entry.getKey().getTableName().toByteArray());
164         Cell previousCell = null;
165         Mutation m = null;
166         int count = entry.getAssociatedCellCount();
167         for (int i = 0; i < count; i++) {
168           // Throw index out of bounds if our cell count is off
169           if (!cells.advance()) {
170             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
171           }
172           Cell cell = cells.current();
173           // Handle bulk load hfiles replication
174           if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
175             if (bulkLoadHFileMap == null) {
176               bulkLoadHFileMap = new HashMap<String, List<Pair<byte[], List<String>>>>();
177             }
178             buildBulkLoadHFileMap(bulkLoadHFileMap, table, cell);
179           } else {
180             // Handle wal replication
181             if (isNewRowOrType(previousCell, cell)) {
182               // Create new mutation
183               m =
184                   CellUtil.isDelete(cell) ? new Delete(cell.getRowArray(), cell.getRowOffset(),
185                       cell.getRowLength()) : new Put(cell.getRowArray(), cell.getRowOffset(),
186                       cell.getRowLength());
187               List<UUID> clusterIds = new ArrayList<UUID>();
188               for (HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()) {
189                 clusterIds.add(toUUID(clusterId));
190               }
191               m.setClusterIds(clusterIds);
192               addToHashMultiMap(rowMap, table, clusterIds, m);
193             }
194             if (CellUtil.isDelete(cell)) {
195               ((Delete) m).addDeleteMarker(cell);
196             } else {
197               ((Put) m).add(cell);
198             }
199             previousCell = cell;
200           }
201         }
202         totalReplicated++;
203       }
204 
205       // TODO Replicating mutations and bulk loaded data can be made parallel
206       if (!rowMap.isEmpty()) {
207         LOG.debug("Started replicating mutations.");
208         for (Entry<TableName, Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
209           batch(entry.getKey(), entry.getValue().values());
210         }
211         LOG.debug("Finished replicating mutations.");
212       }
213 
214       if (bulkLoadHFileMap != null && !bulkLoadHFileMap.isEmpty()) {
215         LOG.debug("Started replicating bulk loaded data.");
216         HFileReplicator hFileReplicator =
217             new HFileReplicator(this.provider.getConf(this.conf, replicationClusterId),
218                 sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath, bulkLoadHFileMap, conf,
219                 getConnection());
220         hFileReplicator.replicate();
221         LOG.debug("Finished replicating bulk loaded data.");
222       }
223 
224       int size = entries.size();
225       this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
226       this.metrics.applyBatch(size + hfilesReplicated, hfilesReplicated);
227       this.totalReplicatedEdits.addAndGet(totalReplicated);
228     } catch (IOException ex) {
229       LOG.error("Unable to accept edit because:", ex);
230       throw ex;
231     }
232   }
233 
234   private void buildBulkLoadHFileMap(
235       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, TableName table,
236       Cell cell) throws IOException {
237     BulkLoadDescriptor bld = WALEdit.getBulkLoadDescriptor(cell);
238     List<StoreDescriptor> storesList = bld.getStoresList();
239     int storesSize = storesList.size();
240     for (int j = 0; j < storesSize; j++) {
241       StoreDescriptor storeDescriptor = storesList.get(j);
242       List<String> storeFileList = storeDescriptor.getStoreFileList();
243       int storeFilesSize = storeFileList.size();
244       hfilesReplicated += storeFilesSize;
245       for (int k = 0; k < storeFilesSize; k++) {
246         byte[] family = storeDescriptor.getFamilyName().toByteArray();
247 
248         // Build hfile relative path from its namespace
249         String pathToHfileFromNS = getHFilePath(table, bld, storeFileList.get(k), family);
250 
251         String tableName = table.getNameWithNamespaceInclAsString();
252         if (bulkLoadHFileMap.containsKey(tableName)) {
253           List<Pair<byte[], List<String>>> familyHFilePathsList = bulkLoadHFileMap.get(tableName);
254           boolean foundFamily = false;
255           for (int i = 0; i < familyHFilePathsList.size(); i++) {
256             Pair<byte[], List<String>> familyHFilePathsPair = familyHFilePathsList.get(i);
257             if (Bytes.equals(familyHFilePathsPair.getFirst(), family)) {
258               // Found family already present, just add the path to the existing list
259               familyHFilePathsPair.getSecond().add(pathToHfileFromNS);
260               foundFamily = true;
261               break;
262             }
263           }
264           if (!foundFamily) {
265             // Family not found, add this family and its hfile paths pair to the list
266             addFamilyAndItsHFilePathToTableInMap(family, pathToHfileFromNS, familyHFilePathsList);
267           }
268         } else {
269           // Add this table entry into the map
270           addNewTableEntryInMap(bulkLoadHFileMap, family, pathToHfileFromNS, tableName);
271         }
272       }
273     }
274   }
275 
276   private void addFamilyAndItsHFilePathToTableInMap(byte[] family, String pathToHfileFromNS,
277       List<Pair<byte[], List<String>>> familyHFilePathsList) {
278     List<String> hfilePaths = new ArrayList<String>();
279     hfilePaths.add(pathToHfileFromNS);
280     familyHFilePathsList.add(new Pair<byte[], List<String>>(family, hfilePaths));
281   }
282 
283   private void addNewTableEntryInMap(
284       final Map<String, List<Pair<byte[], List<String>>>> bulkLoadHFileMap, byte[] family,
285       String pathToHfileFromNS, String tableName) {
286     List<String> hfilePaths = new ArrayList<String>();
287     hfilePaths.add(pathToHfileFromNS);
288     Pair<byte[], List<String>> newFamilyHFilePathsPair =
289         new Pair<byte[], List<String>>(family, hfilePaths);
290     List<Pair<byte[], List<String>>> newFamilyHFilePathsList =
291         new ArrayList<Pair<byte[], List<String>>>();
292     newFamilyHFilePathsList.add(newFamilyHFilePathsPair);
293     bulkLoadHFileMap.put(tableName, newFamilyHFilePathsList);
294   }
295 
296   private String getHFilePath(TableName table, BulkLoadDescriptor bld, String storeFile,
297       byte[] family) {
298     return new StringBuilder(100).append(table.getNamespaceAsString()).append(Path.SEPARATOR)
299         .append(table.getQualifierAsString()).append(Path.SEPARATOR)
300         .append(Bytes.toString(bld.getEncodedRegionName().toByteArray())).append(Path.SEPARATOR)
301         .append(Bytes.toString(family)).append(Path.SEPARATOR).append(storeFile).toString();
302   }
303 
304   /**
305    * @param previousCell
306    * @param cell
307    * @return True if we have crossed over onto a new row or type
308    */
309   private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
310     return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
311         !CellUtil.matchingRow(previousCell, cell);
312   }
313 
314   private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
315     return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
316   }
317 
318   /**
319    * Simple helper to a map from key to (a list of) values
320    * TODO: Make a general utility method
321    * @param map
322    * @param key1
323    * @param key2
324    * @param value
325    * @return the list of values corresponding to key1 and key2
326    */
327   private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
328     Map<K2,List<V>> innerMap = map.get(key1);
329     if (innerMap == null) {
330       innerMap = new HashMap<K2, List<V>>();
331       map.put(key1, innerMap);
332     }
333     List<V> values = innerMap.get(key2);
334     if (values == null) {
335       values = new ArrayList<V>();
336       innerMap.put(key2, values);
337     }
338     values.add(value);
339     return values;
340   }
341 
342   /**
343    * stop the thread pool executor. It is called when the regionserver is stopped.
344    */
345   public void stopReplicationSinkServices() {
346     try {
347       if (this.sharedHtableCon != null) {
348         synchronized (sharedHtableConLock) {
349           if (this.sharedHtableCon != null) {
350             this.sharedHtableCon.close();
351             this.sharedHtableCon = null;
352           }
353         }
354       }
355     } catch (IOException e) {
356       LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
357     }
358   }
359 
360 
361   /**
362    * Do the changes and handle the pool
363    * @param tableName table to insert into
364    * @param allRows list of actions
365    * @throws IOException
366    */
367   protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
368     if (allRows.isEmpty()) {
369       return;
370     }
371     Table table = null;
372     try {
373       Connection connection = getConnection();
374       table = connection.getTable(tableName);
375       for (List<Row> rows : allRows) {
376         table.batch(rows, null);
377       }
378     } catch (InterruptedException ix) {
379       throw (InterruptedIOException) new InterruptedIOException().initCause(ix);
380     } finally {
381       if (table != null) {
382         table.close();
383       }
384     }
385   }
386 
387   private Connection getConnection() throws IOException {
388     // See https://en.wikipedia.org/wiki/Double-checked_locking
389     Connection connection = sharedHtableCon;
390     if (connection == null) {
391       synchronized (sharedHtableConLock) {
392         connection = sharedHtableCon;
393         if (connection == null) {
394           connection = sharedHtableCon = ConnectionFactory.createConnection(conf);
395         }
396       }
397     }
398     return connection;
399   }
400 
401   /**
402    * Get a string representation of this sink's metrics
403    * @return string with the total replicated edits count and the date
404    * of the last edit that was applied
405    */
406   public String getStats() {
407     return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
408       "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
409       ", total replicated edits: " + this.totalReplicatedEdits;
410   }
411 
412   /**
413    * Get replication Sink Metrics
414    * @return MetricsSink
415    */
416   public MetricsSink getSinkMetrics() {
417     return this.metrics;
418   }
419 }