1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.regionserver.wal;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.List;
25 import java.util.Map;
26 import java.util.concurrent.atomic.AtomicLong;
27
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.hbase.classification.InterfaceAudience;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.hbase.Cell;
33 import org.apache.hadoop.hbase.CellScanner;
34 import org.apache.hadoop.hbase.HConstants;
35 import org.apache.hadoop.hbase.HRegionInfo;
36 import org.apache.hadoop.hbase.HRegionLocation;
37 import org.apache.hadoop.hbase.TableName;
38 import org.apache.hadoop.hbase.client.HConnection;
39 import org.apache.hadoop.hbase.client.RegionServerCallable;
40 import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
41 import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
42 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
43 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
44 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
45 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
46 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
47 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49 import org.apache.hadoop.hbase.util.Pair;
50 import org.apache.hadoop.hbase.wal.WAL.Entry;
51
52 import com.google.protobuf.ServiceException;
53
54
55
56
57
58
59
60 @InterfaceAudience.Private
61 public class WALEditsReplaySink {
62
63 private static final Log LOG = LogFactory.getLog(WALEditsReplaySink.class);
64 private static final int MAX_BATCH_SIZE = 1024;
65
66 private final Configuration conf;
67 private final HConnection conn;
68 private final TableName tableName;
69 private final MetricsWALEditsReplay metrics;
70 private final AtomicLong totalReplayedEdits = new AtomicLong();
71 private final boolean skipErrors;
72 private final int replayTimeout;
73 private RpcControllerFactory rpcControllerFactory;
74
75
76
77
78
79
80
81
82 public WALEditsReplaySink(Configuration conf, TableName tableName, HConnection conn)
83 throws IOException {
84 this.conf = conf;
85 this.metrics = new MetricsWALEditsReplay();
86 this.conn = conn;
87 this.tableName = tableName;
88 this.skipErrors = conf.getBoolean(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS,
89 HConstants.DEFAULT_HREGION_EDITS_REPLAY_SKIP_ERRORS);
90
91 this.replayTimeout = conf.getInt("hbase.regionserver.logreplay.timeout", 60000);
92 this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
93 }
94
95
96
97
98
99
100 public void replayEntries(List<Pair<HRegionLocation, Entry>> entries) throws IOException {
101 if (entries.size() == 0) {
102 return;
103 }
104
105 int batchSize = entries.size();
106 Map<HRegionInfo, List<Entry>> entriesByRegion =
107 new HashMap<HRegionInfo, List<Entry>>();
108 HRegionLocation loc = null;
109 Entry entry = null;
110 List<Entry> regionEntries = null;
111
112 for (int i = 0; i < batchSize; i++) {
113 loc = entries.get(i).getFirst();
114 entry = entries.get(i).getSecond();
115 if (entriesByRegion.containsKey(loc.getRegionInfo())) {
116 regionEntries = entriesByRegion.get(loc.getRegionInfo());
117 } else {
118 regionEntries = new ArrayList<Entry>();
119 entriesByRegion.put(loc.getRegionInfo(), regionEntries);
120 }
121 regionEntries.add(entry);
122 }
123
124 long startTime = EnvironmentEdgeManager.currentTime();
125
126
127 for (Map.Entry<HRegionInfo, List<Entry>> _entry : entriesByRegion.entrySet()) {
128 HRegionInfo curRegion = _entry.getKey();
129 List<Entry> allActions = _entry.getValue();
130
131 int totalActions = allActions.size();
132 int replayedActions = 0;
133 int curBatchSize = 0;
134 for (; replayedActions < totalActions;) {
135 curBatchSize = (totalActions > (MAX_BATCH_SIZE + replayedActions)) ? MAX_BATCH_SIZE
136 : (totalActions - replayedActions);
137 replayEdits(loc, curRegion, allActions.subList(replayedActions,
138 replayedActions + curBatchSize));
139 replayedActions += curBatchSize;
140 }
141 }
142
143 long endTime = EnvironmentEdgeManager.currentTime() - startTime;
144 LOG.debug("number of rows:" + entries.size() + " are sent by batch! spent " + endTime
145 + "(ms)!");
146
147 metrics.updateReplayTime(endTime);
148 metrics.updateReplayBatchSize(batchSize);
149
150 this.totalReplayedEdits.addAndGet(batchSize);
151 }
152
153
154
155
156
157 public String getStats() {
158 return this.totalReplayedEdits.get() == 0 ? "" : "Sink: total replayed edits: "
159 + this.totalReplayedEdits;
160 }
161
162 private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo regionInfo,
163 final List<Entry> entries) throws IOException {
164 try {
165 RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(conf, null);
166 ReplayServerCallable<ReplicateWALEntryResponse> callable =
167 new ReplayServerCallable<ReplicateWALEntryResponse>(this.conn, this.tableName, regionLoc,
168 regionInfo, entries);
169 factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, this.replayTimeout);
170 } catch (IOException ie) {
171 if (skipErrors) {
172 LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
173 + "=true so continuing replayEdits with error:" + ie.getMessage());
174 } else {
175 throw ie;
176 }
177 }
178 }
179
180
181
182
183
184 class ReplayServerCallable<R> extends RegionServerCallable<ReplicateWALEntryResponse> {
185 private HRegionInfo regionInfo;
186 private List<Entry> entries;
187
188 ReplayServerCallable(final HConnection connection, final TableName tableName,
189 final HRegionLocation regionLoc, final HRegionInfo regionInfo,
190 final List<Entry> entries) {
191 super(connection, tableName, null);
192 this.entries = entries;
193 this.regionInfo = regionInfo;
194 setLocation(regionLoc);
195 }
196
197 @Override
198 public ReplicateWALEntryResponse call(int callTimeout) throws IOException {
199 try {
200 replayToServer(this.regionInfo, this.entries);
201 } catch (ServiceException se) {
202 throw ProtobufUtil.getRemoteException(se);
203 }
204 return null;
205 }
206
207 private void replayToServer(HRegionInfo regionInfo, List<Entry> entries)
208 throws IOException, ServiceException {
209 if (entries.isEmpty()) return;
210
211 Entry[] entriesArray = new Entry[entries.size()];
212 entriesArray = entries.toArray(entriesArray);
213 AdminService.BlockingInterface remoteSvr = conn.getAdmin(getLocation().getServerName());
214
215 Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
216 ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray);
217 PayloadCarryingRpcController controller = rpcControllerFactory.newController(p.getSecond());
218 try {
219 remoteSvr.replay(controller, p.getFirst());
220 } catch (ServiceException se) {
221 throw ProtobufUtil.getRemoteException(se);
222 }
223 }
224
225 @Override
226 public void prepare(boolean reload) throws IOException {
227 if (!reload) return;
228
229
230
231 boolean skip = false;
232 for (Entry entry : this.entries) {
233 WALEdit edit = entry.getEdit();
234 List<Cell> cells = edit.getCells();
235 for (Cell cell : cells) {
236
237 setLocation(conn.locateRegion(tableName, cell.getRow()));
238 skip = true;
239 break;
240 }
241
242 if (skip) break;
243 }
244 }
245 }
246 }