View Javadoc

1   /*
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.replication.regionserver;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collection;
25  import java.util.HashMap;
26  import java.util.List;
27  import java.util.Map;
28  import java.util.Map.Entry;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  import java.util.concurrent.atomic.AtomicLong;
32  
33  import org.apache.commons.lang.StringUtils;
34  import org.apache.commons.logging.Log;
35  import org.apache.commons.logging.LogFactory;
36  import org.apache.hadoop.hbase.classification.InterfaceAudience;
37  import org.apache.hadoop.conf.Configuration;
38  import org.apache.hadoop.hbase.Cell;
39  import org.apache.hadoop.hbase.CellScanner;
40  import org.apache.hadoop.hbase.CellUtil;
41  import org.apache.hadoop.hbase.TableName;
42  import org.apache.hadoop.hbase.HBaseConfiguration;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.Stoppable;
45  import org.apache.hadoop.hbase.client.Connection;
46  import org.apache.hadoop.hbase.client.ConnectionFactory;
47  import org.apache.hadoop.hbase.client.Delete;
48  import org.apache.hadoop.hbase.client.Mutation;
49  import org.apache.hadoop.hbase.client.Put;
50  import org.apache.hadoop.hbase.client.Row;
51  import org.apache.hadoop.hbase.client.Table;
52  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
53  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
54  
55  /**
56   * <p>
57   * This class is responsible for replicating the edits coming
58   * from another cluster.
59   * </p><p>
60   * This replication process is currently waiting for the edits to be applied
61   * before the method can return. This means that the replication of edits
62   * is synchronized (after reading from WALs in ReplicationSource) and that a
63   * single region server cannot receive edits from two sources at the same time
64   * </p><p>
65   * This class uses the native HBase client in order to replicate entries.
66   * </p>
67   *
68   * TODO make this class more like ReplicationSource wrt log handling
69   */
70  @InterfaceAudience.Private
71  public class ReplicationSink {
72  
73    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
74    private final Configuration conf;
75    // Volatile because of note in here -- look for double-checked locking:
76    // http://www.oracle.com/technetwork/articles/javase/bloch-effective-08-qa-140880.html
77    private volatile Connection sharedHtableCon;
78    private final MetricsSink metrics;
79    private final AtomicLong totalReplicatedEdits = new AtomicLong();
80    private final Object sharedHtableConLock = new Object();
81  
82    /**
83     * Create a sink for replication
84     *
85     * @param conf                conf object
86     * @param stopper             boolean to tell this thread to stop
87     * @throws IOException thrown when HDFS goes bad or bad file name
88     */
89    public ReplicationSink(Configuration conf, Stoppable stopper)
90        throws IOException {
91      this.conf = HBaseConfiguration.create(conf);
92      decorateConf();
93      this.metrics = new MetricsSink();
94    }
95  
96    /**
97     * decorate the Configuration object to make replication more receptive to delays:
98     * lessen the timeout and numTries.
99     */
100   private void decorateConf() {
101     this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
102         this.conf.getInt("replication.sink.client.retries.number", 4));
103     this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
104         this.conf.getInt("replication.sink.client.ops.timeout", 10000));
105     String replicationCodec = this.conf.get(HConstants.REPLICATION_CODEC_CONF_KEY);
106     if (StringUtils.isNotEmpty(replicationCodec)) {
107       this.conf.set(HConstants.RPC_CODEC_CONF_KEY, replicationCodec);
108     }
109    }
110 
111   /**
112    * Replicate this array of entries directly into the local cluster using the native client. Only
113    * operates against raw protobuf type saving on a conversion from pb to pojo.
114    * @param entries
115    * @param cells
116    * @throws IOException
117    */
118   public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
119     if (entries.isEmpty()) return;
120     if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
121     // Very simple optimization where we batch sequences of rows going
122     // to the same table.
123     try {
124       long totalReplicated = 0;
125       // Map of table => list of Rows, grouped by cluster id, we only want to flushCommits once per
126       // invocation of this method per table and cluster id.
127       Map<TableName, Map<List<UUID>, List<Row>>> rowMap =
128           new TreeMap<TableName, Map<List<UUID>, List<Row>>>();
129       for (WALEntry entry : entries) {
130         TableName table =
131             TableName.valueOf(entry.getKey().getTableName().toByteArray());
132         Cell previousCell = null;
133         Mutation m = null;
134         int count = entry.getAssociatedCellCount();
135         for (int i = 0; i < count; i++) {
136           // Throw index out of bounds if our cell count is off
137           if (!cells.advance()) {
138             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
139           }
140           Cell cell = cells.current();
141           if (isNewRowOrType(previousCell, cell)) {
142             // Create new mutation
143             m = CellUtil.isDelete(cell)?
144               new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
145               new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
146             List<UUID> clusterIds = new ArrayList<UUID>();
147             for(HBaseProtos.UUID clusterId : entry.getKey().getClusterIdsList()){
148               clusterIds.add(toUUID(clusterId));
149             }
150             m.setClusterIds(clusterIds);
151             addToHashMultiMap(rowMap, table, clusterIds, m);
152           }
153           if (CellUtil.isDelete(cell)) {
154             ((Delete)m).addDeleteMarker(cell);
155           } else {
156             ((Put)m).add(cell);
157           }
158           previousCell = cell;
159         }
160         totalReplicated++;
161       }
162       for (Entry<TableName, Map<List<UUID>,List<Row>>> entry : rowMap.entrySet()) {
163         batch(entry.getKey(), entry.getValue().values());
164       }
165       int size = entries.size();
166       this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
167       this.metrics.applyBatch(size);
168       this.totalReplicatedEdits.addAndGet(totalReplicated);
169     } catch (IOException ex) {
170       LOG.error("Unable to accept edit because:", ex);
171       throw ex;
172     }
173   }
174 
175   /**
176    * @param previousCell
177    * @param cell
178    * @return True if we have crossed over onto a new row or type
179    */
180   private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
181     return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
182         !CellUtil.matchingRow(previousCell, cell);
183   }
184 
185   private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
186     return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
187   }
188 
189   /**
190    * Simple helper to a map from key to (a list of) values
191    * TODO: Make a general utility method
192    * @param map
193    * @param key1
194    * @param key2
195    * @param value
196    * @return the list of values corresponding to key1 and key2
197    */
198   private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
199     Map<K2,List<V>> innerMap = map.get(key1);
200     if (innerMap == null) {
201       innerMap = new HashMap<K2, List<V>>();
202       map.put(key1, innerMap);
203     }
204     List<V> values = innerMap.get(key2);
205     if (values == null) {
206       values = new ArrayList<V>();
207       innerMap.put(key2, values);
208     }
209     values.add(value);
210     return values;
211   }
212 
213   /**
214    * stop the thread pool executor. It is called when the regionserver is stopped.
215    */
216   public void stopReplicationSinkServices() {
217     try {
218       if (this.sharedHtableCon != null) {
219         synchronized (sharedHtableConLock) {
220           if (this.sharedHtableCon != null) {
221             this.sharedHtableCon.close();
222             this.sharedHtableCon = null;
223           }
224         }
225       }
226     } catch (IOException e) {
227       LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
228     }
229   }
230 
231 
232   /**
233    * Do the changes and handle the pool
234    * @param tableName table to insert into
235    * @param allRows list of actions
236    * @throws IOException
237    */
238   protected void batch(TableName tableName, Collection<List<Row>> allRows) throws IOException {
239     if (allRows.isEmpty()) {
240       return;
241     }
242     Table table = null;
243     try {
244       // See https://en.wikipedia.org/wiki/Double-checked_locking
245       Connection connection = this.sharedHtableCon;
246       if (connection == null) {
247         synchronized (sharedHtableConLock) {
248           connection = this.sharedHtableCon;
249           if (connection == null) {
250             connection = this.sharedHtableCon = ConnectionFactory.createConnection(this.conf);
251           }
252         }
253       }
254       table = connection.getTable(tableName);
255       for (List<Row> rows : allRows) {
256         table.batch(rows);
257       }
258     } catch (InterruptedException ix) {
259       throw (InterruptedIOException)new InterruptedIOException().initCause(ix);
260     } finally {
261       if (table != null) {
262         table.close();
263       }
264     }
265   }
266 
267   /**
268    * Get a string representation of this sink's metrics
269    * @return string with the total replicated edits count and the date
270    * of the last edit that was applied
271    */
272   public String getStats() {
273     return this.totalReplicatedEdits.get() == 0 ? "" : "Sink: " +
274       "age in ms of last applied edit: " + this.metrics.refreshAgeOfLastAppliedOp() +
275       ", total replicated edits: " + this.totalReplicatedEdits;
276   }
277 
278   /**
279    * Get replication Sink Metrics
280    * @return MetricsSink
281    */
282   public MetricsSink getSinkMetrics() {
283     return this.metrics;
284   }
285 }