View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.lang.StringUtils;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellScanner;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.Stoppable;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.Delete;
48  import org.apache.hadoop.hbase.client.Mutation;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Row;
51  import org.apache.hadoop.hbase.client.Table;
52  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
53  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
54  
55  /**
56   * This class is responsible for replicating the edits coming
57   * from another cluster.
58   * <p/>
59   * This replication process is currently waiting for the edits to be applied
60   * before the method can return. This means that the replication of edits
61   * is synchronized (after reading from WALs in ReplicationSource) and that a
62   * single region server cannot receive edits from two sources at the same time
63   * <p/>
64   * This class uses the native HBase client in order to replicate entries.
65   * <p/>
66   *
67   * TODO make this class more like ReplicationSource wrt log handling
68   */
69  @InterfaceAudience.Private
70  public class ReplicationSink {
71  
72    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
73    private final Configuration conf;
74    // Volatile because of note in here -- look for double-checked locking:
75    // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
76    private volatile Connection sharedHtableCon;
77    private final MetricsSink metrics;
78    private final AtomicLong totalReplicatedEdits = new AtomicLong();
79    private final Object sharedHtableConLock = new Object();
80  
81    /**
82     * Create a sink for replication
83     *
84     * @param conf                conf object
85     * @param stopper             boolean to tell this thread to stop
86     * @throws IOException thrown when HDFS goes bad or bad file name
87     */
88    public ReplicationSink(Configuration conf, Stoppable stopper)
89        throws IOException {
90      this.conf = HBaseConfiguration.create(conf);
91      decorateConf();
92      this.metrics = new MetricsSink();
93    }
94  
95    /**
96     * decorate the Configuration object to make replication more receptive to delays:
97     * lessen the timeout and numTries.
98     */
99    private void decorateConf() {
100     this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
101         this.conf.getInt("replication.sink.client.retries.number", 4));
102     this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
103         this.conf.getInt("replication.sink.client.ops.timeout", 10000));
104     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
105     if (StringUtils.isNotEmpty(replicationCodec)) {
106       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
107     }
108    }
109 
110   /**
111    * Replicate this array of entries directly into the local cluster using the native client. Only
112    * operates against raw protobuf type saving on a conversion from pb to pojo.
113    * @param entries
114    * @param cells
115    * @throws IOException
116    */
117   public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
118     if (entries.isEmpty()) return;
119     if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
120     // Very simple optimization where we batch sequences of rows going
121     // to the same table.
122     try {
123       long totalReplicated = 0;
124       // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
125       // invocation of this method per table and cluster id.
126       Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
127           new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
128       for (WALEntry entry : entries) {
129         TableName table =
130             TableName.valueOf(entry.getKey().getTableName().toByteArray());
131         Cell previousCell = null;
132         Mutation m = null;
133         int count = entry.getAssociatedCellCount();
134         for (int i = 0; i < count; i++) {
135           // Throw index out of bounds if our cell count is off
136           if (!cells.advance()) {
137             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
138           }
139           Cell cell = cells.current();
140           if (isNewRowOrType(previousCell, cell)) {
141             // Create new mutation
142             m = CellUtil.isDelete(cell)?
143               new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
144               new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
145             List<UUID> clusterIds = new ArrayList<UUID>();
146             for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
147               clusterIds.add(toUUID(clusterId));
148             }
149             m.setClusterIds(clusterIds);
150             addToHashMultiMap(rowMap, table, clusterIds, m);
151           }
152           if (CellUtil.isDelete(cell)) {
153             ((Delete)m).addDeleteMarker(cell);
154           } else {
155             ((Put)m).add(cell);
156           }
157           previousCell = cell;
158         }
159         totalReplicated++;
160       }
161       for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
162         batch(entry.getKey(), entry.getValue().values());
163       }
164       int size = entries.size();
165       this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
166       this.metrics.applyBatch(size);
167       this.totalReplicatedEdits.addAndGet(totalReplicated);
168     } catch (IOException ex) {
169       LOG.error("Unable to accept edit because:", ex);
170       throw ex;
171     }
172   }
173 
174   /**
175    * @param previousCell
176    * @param cell
177    * @return True if we have crossed over onto a new row or type
178    */
179   private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
180     return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
181         !CellUtil.matchingRow(previousCell, cell);
182   }
183 
184   private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
185     return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
186   }
187 
188   /**
189    * Simple helper to a map from key to (a list of) values
190    * TODO: Make a general utility method
191    * @param map
192    * @param key1
193    * @param key2
194    * @param value
195    * @return the list of values corresponding to key1 and key2
196    */
197   private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
198     Map<K2,List<V>> innerMap = map.get(key1);
199     if (innerMap == null) {
200       innerMap = new HashMap<K2, List<V>>();
201       map.put(key1, innerMap);
202     }
203     List<V> values = innerMap.get(key2);
204     if (values == null) {
205       values = new ArrayList<V>();
206       innerMap.put(key2, values);
207     }
208     values.add(value);
209     return values;
210   }
211 
212   /**
213    * stop the thread pool executor. It is called when the regionserver is stopped.
214    */
215   public void stopReplicationSinkServices() {
216     try {
217       if (this.sharedHtableCon != null) {
218         synchronized (sharedHtableConLock) {
219           if (this.sharedHtableCon != null) {
220             this.sharedHtableCon.close();
221             this.sharedHtableCon = null;
222           }
223         }
224       }
225     } catch (IOException e) {
226       LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
227     }
228   }
229 
230 
231   /**
232    * Do the changes and handle the pool
233    * @param tableName table to insert into
234    * @param allRows list of actions
235    * @throws IOException
236    */
237   protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
238     if (allRows.isEmpty()) {
239       return;
240     }
241     Table table = null;
242     try {
243       // See https://en.wikipedia.org/wiki/Double-checked_locking
244       Connection connection = this.sharedHtableCon;
245       if (connection == null) {
246         synchronized (sharedHtableConLock) {
247           connection = this.sharedHtableCon;
248           if (connection == null) {
249             connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
250           }
251         }
252       }
253       table = connection.getTable(tableName);
254       for (List<Row> rows : allRows) {
255         table.batch(rows);
256       }
257     } catch (InterruptedException ix) {
258       throw (InterruptedIOException)new InterruptedIOException().initCause(ix);
259     } finally {
260       if (table != null) {
261         table.close();
262       }
263     }
264   }
265 
266   /**
267    * Get a string representation of this sink's metrics
268    * @return string with the total replicated edits count and the date
269    * of the last edit that was applied
270    */
271   public String getStats() {
272     return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
273       "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
274       ", total replicated edits: " + this.totalReplicatedEdits;
275   }
276 
277   /**
278    * Get replication Sink Metrics
279    * @return MetricsSink
280    */
281   public MetricsSink getSinkMetrics() {
282     return this.metrics;
283   }
284 }