View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.IOException;
22  import java.util.ArrayList;
23  import java.util.HashMap;
24  import java.util.List;
25  import java.util.Map;
26  import java.util.concurrent.atomic.AtomicLong;
27  
28  import org.apache.commons.logging.Log;
29  import org.apache.commons.logging.LogFactory;
30  import org.apache.hadoop.hbase.classification.InterfaceAudience;
31  import org.apache.hadoop.conf.Configuration;
32  import org.apache.hadoop.hbase.Cell;
33  import org.apache.hadoop.hbase.CellScanner;
34  import org.apache.hadoop.hbase.HConstants;
35  import org.apache.hadoop.hbase.HRegionInfo;
36  import org.apache.hadoop.hbase.HRegionLocation;
37  import org.apache.hadoop.hbase.TableName;
38  import org.apache.hadoop.hbase.client.HConnection;
39  import org.apache.hadoop.hbase.client.RegionServerCallable;
40  import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
41  import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
42  import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
43  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
44  import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
45  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
46  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
47  import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  import org.apache.hadoop.hbase.util.Pair;
50  import org.apache.hadoop.hbase.wal.WAL.Entry;
51  
52  import com.google.protobuf.ServiceException;
53  
54  /**
55   * This class is responsible for replaying the edits coming from a failed region server.
56   * <p>
57   * This class uses the native HBase client in order to replay WAL entries.
58   * </p>
59   */
60  @InterfaceAudience.Private
61  public class WALEditsReplaySink {
62  
63    private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
64    private static final int MAX_BATCH_SIZE = 1024;
65  
66    private final Configuration conf;
67    private final HConnection conn;
68    private final TableName tableName;
69    private final MetricsWALEditsReplay metrics;
70    private final AtomicLong totalReplayedEdits = new AtomicLong();
71    private final boolean skipErrors;
72    private final int replayTimeout;
73    private RpcControllerFactory rpcControllerFactory;
74  
75    /**
76     * Create a sink for WAL log entries replay
77     * @param conf
78     * @param tableName
79     * @param conn
80     * @throws IOException
81     */
82    public WALEditsReplaySink(Configuration conf, TableName tableName, HConnection conn)
83        throws IOException {
84      this.conf = conf;
85      this.metrics = new MetricsWALEditsReplay();
86      this.conn = conn;
87      this.tableName = tableName;
88      this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
89        HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
90      // a single replay operation time out and default is 60 seconds
91      this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
92      this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
93    }
94  
95    /**
96     * Replay an array of actions of the same region directly into the newly assigned Region Server
97     * @param entries
98     * @throws IOException
99     */
100   public void replayEntries(List<Pair<HRegionLocation, Entry>> entries) throws IOException {
101     if (entries.size() == 0) {
102       return;
103     }
104 
105     int batchSize = entries.size();
106     Map<HRegionInfo, List<Entry>> entriesByRegion =
107         new HashMap<HRegionInfo, List<Entry>>();
108     HRegionLocation loc = null;
109     Entry entry = null;
110     List<Entry> regionEntries = null;
111     // Build the action list.
112     for (int i = 0; i < batchSize; i++) {
113       loc = entries.get(i).getFirst();
114       entry = entries.get(i).getSecond();
115       if (entriesByRegion.containsKey(loc.getRegionInfo())) {
116         regionEntries = entriesByRegion.get(loc.getRegionInfo());
117       } else {
118         regionEntries = new ArrayList<Entry>();
119         entriesByRegion.put(loc.getRegionInfo(), regionEntries);
120       }
121       regionEntries.add(entry);
122     }
123 
124     long startTime = EnvironmentEdgeManager.currentTime();
125 
126     // replaying edits by region
127     for (Map.Entry<HRegionInfo, List<Entry>> _entry : entriesByRegion.entrySet()) {
128       HRegionInfo curRegion = _entry.getKey();
129       List<Entry> allActions = _entry.getValue();
130       // send edits in chunks
131       int totalActions = allActions.size();
132       int replayedActions = 0;
133       int curBatchSize = 0;
134       for (; replayedActions < totalActions;) {
135         curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
136                 : (totalActions - replayedActions);
137         replayEdits(loc, curRegion, allActions.subList(replayedActions,
138           replayedActions + curBatchSize));
139         replayedActions += curBatchSize;
140       }
141     }
142 
143     long endTime = EnvironmentEdgeManager.currentTime() - startTime;
144     LOG.debug("number of rows:" + entries.size() + " are sent by batch! spent " + endTime
145         + "(ms)!");
146 
147     metrics.updateReplayTime(endTime);
148     metrics.updateReplayBatchSize(batchSize);
149 
150     this.totalReplayedEdits.addAndGet(batchSize);
151   }
152 
153   /**
154    * Get a string representation of this sink's metrics
155    * @return string with the total replayed edits count
156    */
157   public String getStats() {
158     return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: "
159         + this.totalReplayedEdits;
160   }
161 
162   private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
163       final List<Entry> entries) throws IOException {
164     try {
165       RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
166       ReplayServerCallable<ReplicateWALEntryResponse> callable =
167           new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
168               regionInfo, entries);
169       factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
170     } catch (IOException ie) {
171       if (skipErrors) {
172         LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
173             + "=true so continuing replayEdits with error:" + ie.getMessage());
174       } else {
175         throw ie;
176       }
177     }
178   }
179 
180   /**
181    * Callable that handles the <code>replay</code> method call going against a single regionserver
182    * @param <R>
183    */
184   class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
185     private HRegionInfo regionInfo;
186     private List<Entry> entries;
187 
188     ReplayServerCallable(final HConnection connection, final TableName tableName,
189         final HRegionLocation regionLoc, final HRegionInfo regionInfo,
190         final List<Entry> entries) {
191       super(connection, tableName, null);
192       this.entries = entries;
193       this.regionInfo = regionInfo;
194       setLocation(regionLoc);
195     }
196 
197     @Override
198     public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
199       try {
200         replayToServer(this.regionInfo, this.entries);
201       } catch (ServiceException se) {
202         throw ProtobufUtil.getRemoteException(se);
203       }
204       return null;
205     }
206 
207     private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
208         throws IOException, ServiceException {
209       if (entries.isEmpty()) return;
210 
211       Entry[] entriesArray = new Entry[entries.size()];
212       entriesArray = entries.toArray(entriesArray);
213       AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
214 
215       Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
216           ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
217       PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
218       try {
219         remoteSvr.replay(controller, p.getFirst());
220       } catch (ServiceException se) {
221         throw ProtobufUtil.getRemoteException(se);
222       }
223     }
224 
225     @Override
226     public void prepare(boolean reload) throws IOException {
227       if (!reload) return;
228       // relocate regions in case we have a new dead server or network hiccup
229       // if not due to connection issue, the following code should run fast because it uses
230       // cached location
231       boolean skip = false;
232       for (Entry entry : this.entries) {
233         WALEdit edit = entry.getEdit();
234         List<Cell> cells = edit.getCells();
235         for (Cell cell : cells) {
236           // filtering WAL meta entries
237           setLocation(conn.locateRegion(tableName, cell.getRow()));
238           skip = true;
239           break;
240         }
241         // use first log entry to relocate region because all entries are for one region
242         if (skip) break;
243       }
244     }
245   }
246 }