View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  package org.apache.hadoop.hbase.replication.regionserver;
21  
22  import org.apache.commons.logging.Log;
23  import org.apache.commons.logging.LogFactory;
24  import org.apache.hadoop.conf.Configuration;
25  import org.apache.hadoop.hbase.HBaseConfiguration;
26  import org.apache.hadoop.hbase.HConstants;
27  import org.apache.hadoop.hbase.KeyValue;
28  import org.apache.hadoop.hbase.client.Delete;
29  import org.apache.hadoop.hbase.client.HConnection;
30  import org.apache.hadoop.hbase.client.HConnectionManager;
31  import org.apache.hadoop.hbase.client.HTableInterface;
32  import org.apache.hadoop.hbase.client.Put;
33  import org.apache.hadoop.hbase.client.Row;
34  import org.apache.hadoop.hbase.regionserver.wal.HLog;
35  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
36  import org.apache.hadoop.hbase.util.Bytes;
37  import org.apache.hadoop.hbase.Stoppable;
38  
39  import java.io.IOException;
40  import java.util.ArrayList;
41  import java.util.Collection;
42  import java.util.HashMap;
43  import java.util.List;
44  import java.util.Map;
45  import java.util.TreeMap;
46  import java.util.UUID;
47  
48  /**
49   * This class is responsible for replicating the edits coming
50   * from another cluster.
51   * <p/>
52   * This replication process is currently waiting for the edits to be applied
53   * before the method can return. This means that the replication of edits
54   * is synchronized (after reading from HLogs in ReplicationSource) and that a
55   * single region server cannot receive edits from two sources at the same time
56   * <p/>
57   * This class uses the native HBase client in order to replicate entries.
58   * <p/>
59   *
60   * TODO make this class more like ReplicationSource wrt log handling
61   */
62  public class ReplicationSink {
63  
64    private static final Log LOG = LogFactory.getLog(ReplicationSink.class);
65    // Name of the HDFS directory that contains the temporary rep logs
66    public static final String REPLICATION_LOG_DIR = ".replogs";
67    private final Configuration conf;
68    private final HConnection sharedHtableCon;
69    private final ReplicationSinkMetrics metrics;
70  
71    /**
72     * Create a sink for replication
73     *
74     * @param conf                conf object
75     * @param stopper             boolean to tell this thread to stop
76     * @throws IOException thrown when HDFS goes bad or bad file name
77     */
78    public ReplicationSink(Configuration conf, Stoppable stopper)
79        throws IOException {
80      this.conf = HBaseConfiguration.create(conf);
81      decorateConf();
82      this.sharedHtableCon = HConnectionManager.createConnection(this.conf);
83      this.metrics = new ReplicationSinkMetrics();
84    }
85  
86    /**
87     * decorate the Configuration object to make replication more receptive to
88     * delays: lessen the timeout and numTries.
89     */
90    private void decorateConf() {
91      this.conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
92          this.conf.getInt("replication.sink.client.retries.number", 4));
93      this.conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
94          this.conf.getInt("replication.sink.client.ops.timeout", 10000));
95    }
96  
97    /**
98     * Replicate this array of entries directly into the local cluster
99     * using the native client.
100    *
101    * @param entries
102    * @throws IOException
103    */
104   public void replicateEntries(HLog.Entry[] entries)
105       throws IOException {
106     if (entries.length == 0) {
107       return;
108     }
109     // Very simple optimization where we batch sequences of rows going
110     // to the same table.
111     try {
112       long totalReplicated = 0;
113       // Map of table => list of Rows, grouped by clusters that consumed the change, we only want to
114       // flushCommits once per
115       // invocation of this method per table and clusters that have consumed the change.
116       Map<byte[], Map<List<UUID>, List<Row>>> rowMap =
117           new TreeMap<byte[], Map<List<UUID>, List<Row>>>(Bytes.BYTES_COMPARATOR);
118       for (HLog.Entry entry : entries) {
119         WALEdit edit = entry.getEdit();
120         byte[] table = entry.getKey().getTablename();
121         Put put = null;
122         Delete del = null;
123         KeyValue lastKV = null;
124         List<KeyValue> kvs = edit.getKeyValues();
125         for (KeyValue kv : kvs) {
126           if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
127             UUID clusterId = entry.getKey().getClusterId();
128             List<UUID> clusterIds = edit.getClusterIds();
129             if (kv.isDelete()) {
130               del = new Delete(kv.getRow());
131               del.setClusterId(clusterId);
132               del.setClusterIds(clusterIds);
133               clusterIds.add(clusterId);
134               addToHashMultiMap(rowMap, table, clusterIds, del);
135             } else {
136               put = new Put(kv.getRow());
137               put.setClusterId(clusterId);
138               put.setClusterIds(clusterIds);
139               clusterIds.add(clusterId);
140               addToHashMultiMap(rowMap, table, clusterIds, put);
141             }
142           }
143           if (kv.isDelete()) {
144             del.addDeleteMarker(kv);
145           } else {
146             put.add(kv);
147           }
148           lastKV = kv;
149         }
150         totalReplicated++;
151       }
152       for(Map.Entry<byte[], Map<List<UUID>, List<Row>>> entry : rowMap.entrySet()) {
153         batch(entry.getKey(), entry.getValue().values());
154       }
155       this.metrics.setAgeOfLastAppliedOp(
156           entries[entries.length-1].getKey().getWriteTime());
157       this.metrics.appliedBatchesRate.inc(1);
158       LOG.info("Total replicated: " + totalReplicated);
159     } catch (IOException ex) {
160       LOG.error("Unable to accept edit because:", ex);
161       throw ex;
162     }
163   }
164 
165   /**
166    * Simple helper to a map from key to (a list of) values
167    * TODO: Make a general utility method
168    * @param map
169    * @param key1
170    * @param key2
171    * @param value
172    * @return the list of values for the combination of key1 and key2
173    */
174   private <K1, K2, V> List<V> addToHashMultiMap(Map<K1, Map<K2,List<V>>> map, K1 key1, K2 key2, V value) {
175     Map<K2,List<V>> innerMap = map.get(key1);
176     if (innerMap == null) {
177       innerMap = new HashMap<K2, List<V>>();
178       map.put(key1, innerMap);
179     }
180     List<V> values = innerMap.get(key2);
181     if (values == null) {
182       values = new ArrayList<V>();
183       innerMap.put(key2, values);
184     }
185     values.add(value);
186     return values;
187   }
188 
189   /**
190    * stop the thread pool executor. It is called when the regionserver is stopped.
191    */
192   public void stopReplicationSinkServices() {
193     try {
194       this.sharedHtableCon.close();
195     } catch (IOException e) {
196       LOG.warn("IOException while closing the connection", e); // ignoring as we are closing.
197     }
198   }  
199 
200   /**
201    * Do the changes and handle the pool
202    * @param tableName table to insert into
203    * @param allRows list of actions
204    * @throws IOException
205    */
206   protected void batch(byte[] tableName, Collection<List<Row>> allRows) throws IOException {
207     if (allRows.isEmpty()) {
208       return;
209     }
210     HTableInterface table = null;
211     try {
212       table = this.sharedHtableCon.getTable(tableName);
213       for (List<Row> rows : allRows) {
214         table.batch(rows);
215         this.metrics.appliedOpsRate.inc(rows.size());
216       }
217     } catch (InterruptedException ix) {
218       throw new IOException(ix);
219     } finally {
220       if (table != null) {
221         table.close();
222       }
223     }
224   }
225 }