1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.apache.hadoop.hbase.regionserver.wal;
19
20 import java.util.ArrayList;
21 import java.util.Collections;
22 import java.util.HashMap;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.TreeMap;
27 import java.util.concurrent.ConcurrentMap;
28 import java.util.concurrent.ConcurrentSkipListMap;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.hbase.HConstants;
33 import org.apache.hadoop.hbase.util.Bytes;
34
35 import com.google.common.collect.Maps;
36
37
38
39
40
41
42 class SequenceIdAccounting {
43 private static final Log LOG = LogFactory.getLog(SequenceIdAccounting.class);
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 private final Object tieLock = new Object();
61
62
63
64
65
66
67
68
69
70
71
72
73 private final ConcurrentMap<byte[], ConcurrentMap<byte[], Long>> lowestUnflushedSequenceIds
74 = new ConcurrentSkipListMap<byte[], ConcurrentMap<byte[], Long>>(
75 Bytes.BYTES_COMPARATOR);
76
77
78
79
80
81
82
83 private final Map<byte[], Map<byte[], Long>> flushingSequenceIds =
84 new TreeMap<byte[], Map<byte[], Long>>(Bytes.BYTES_COMPARATOR);
85
86
87
88
89
90
91
92
93
94 private Map<byte[], Long> highestSequenceIds = new HashMap<byte[], Long>();
95
96
97
98
99
100
101
102 long getLowestSequenceId(final byte [] encodedRegionName) {
103 synchronized (this.tieLock) {
104 Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
105 long flushingLowest = m != null? getLowestSequenceId(m): Long.MAX_VALUE;
106 m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
107 long unflushedLowest = m != null? getLowestSequenceId(m): HConstants.NO_SEQNUM;
108 return Math.min(flushingLowest, unflushedLowest);
109 }
110 }
111
112
113
114
115
116
117
118 long getLowestSequenceId(final byte [] encodedRegionName, final byte [] familyName) {
119 synchronized (this.tieLock) {
120 Map<byte[], Long> m = this.flushingSequenceIds.get(encodedRegionName);
121 if (m != null) {
122 Long lowest = m.get(familyName);
123 if (lowest != null) return lowest;
124 }
125 m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
126 if (m != null) {
127 Long lowest = m.get(familyName);
128 if (lowest != null) return lowest;
129 }
130 }
131 return HConstants.NO_SEQNUM;
132 }
133
134
135
136
137
138
139 Map<byte[], Long> resetHighest() {
140 Map<byte[], Long> old = this.highestSequenceIds;
141 this.highestSequenceIds = new HashMap<byte[], Long>();
142 return old;
143 }
144
145
146
147
148
149
150
151
152
153
154 void update(byte[] encodedRegionName, Set<byte[]> families, long sequenceid,
155 final boolean lowest) {
156 Long l = Long.valueOf(sequenceid);
157 this.highestSequenceIds.put(encodedRegionName, l);
158 if (lowest) {
159 ConcurrentMap<byte[], Long> m = getOrCreateLowestSequenceIds(encodedRegionName);
160 for (byte[] familyName : families) {
161 m.putIfAbsent(familyName, l);
162 }
163 }
164 }
165
166 ConcurrentMap<byte[], Long> getOrCreateLowestSequenceIds(byte[] encodedRegionName) {
167
168 ConcurrentMap<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
169 if (m != null) return m;
170 m = new ConcurrentSkipListMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
171
172 ConcurrentMap<byte[], Long> alreadyPut =
173 this.lowestUnflushedSequenceIds.putIfAbsent(encodedRegionName, m);
174 return alreadyPut == null? m : alreadyPut;
175 }
176
177
178
179
180
181 static long getLowestSequenceId(Map<byte[], Long> sequenceids) {
182 long lowest = HConstants.NO_SEQNUM;
183 for (Long sid: sequenceids.values()) {
184 if (lowest == HConstants.NO_SEQNUM || sid.longValue() < lowest) {
185 lowest = sid.longValue();
186 }
187 }
188 return lowest;
189 }
190
191
192
193
194
195
196 private <T extends Map<byte[], Long>> Map<byte[], Long> flattenToLowestSequenceId(
197 Map<byte[], T> src) {
198 if (src == null || src.isEmpty()) return null;
199 Map<byte[], Long> tgt = Maps.newHashMap();
200 for (Map.Entry<byte[], T> entry: src.entrySet()) {
201 long lowestSeqId = getLowestSequenceId(entry.getValue());
202 if (lowestSeqId != HConstants.NO_SEQNUM) {
203 tgt.put(entry.getKey(), lowestSeqId);
204 }
205 }
206 return tgt;
207 }
208
209
210
211
212
213
214
215
216
217
218 Long startCacheFlush(final byte[] encodedRegionName, final Set<byte[]> families) {
219 Map<byte[], Long> oldSequenceIds = null;
220 Long lowestUnflushedInRegion = HConstants.NO_SEQNUM;
221 synchronized (tieLock) {
222 Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(encodedRegionName);
223 if (m != null) {
224
225
226
227
228 for (byte[] familyName: families) {
229 Long seqId = m.remove(familyName);
230 if (seqId != null) {
231 if (oldSequenceIds == null) oldSequenceIds = Maps.newTreeMap(Bytes.BYTES_COMPARATOR);
232 oldSequenceIds.put(familyName, seqId);
233 }
234 }
235 if (oldSequenceIds != null && !oldSequenceIds.isEmpty()) {
236 if (this.flushingSequenceIds.put(encodedRegionName, oldSequenceIds) != null) {
237 LOG.warn("Flushing Map not cleaned up for " + Bytes.toString(encodedRegionName) +
238 ", sequenceid=" + oldSequenceIds);
239 }
240 }
241 if (m.isEmpty()) {
242
243
244
245
246 this.lowestUnflushedSequenceIds.remove(encodedRegionName);
247 } else {
248
249 lowestUnflushedInRegion = Collections.min(m.values());
250 }
251 }
252 }
253
254 if (oldSequenceIds != null && oldSequenceIds.isEmpty()) {
255
256
257
258
259
260 LOG.warn("Couldn't find oldest sequenceid for " + Bytes.toString(encodedRegionName));
261 }
262 return lowestUnflushedInRegion;
263 }
264
265 void completeCacheFlush(final byte [] encodedRegionName) {
266 synchronized (tieLock) {
267 this.flushingSequenceIds.remove(encodedRegionName);
268 }
269 }
270
271 void abortCacheFlush(final byte[] encodedRegionName) {
272
273
274 Map<byte[], Long> flushing = null;
275 Map<byte[], Long> tmpMap = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
276
277
278
279 synchronized (tieLock) {
280 flushing = this.flushingSequenceIds.remove(encodedRegionName);
281 if (flushing != null) {
282 Map<byte[], Long> unflushed = getOrCreateLowestSequenceIds(encodedRegionName);
283 for (Map.Entry<byte[], Long> e: flushing.entrySet()) {
284
285
286 tmpMap.put(e.getKey(), unflushed.put(e.getKey(), e.getValue()));
287 }
288 }
289 }
290
291
292
293 if (flushing != null) {
294 for (Map.Entry<byte[], Long> e : flushing.entrySet()) {
295 Long currentId = tmpMap.get(e.getKey());
296 if (currentId != null && currentId.longValue() <= e.getValue().longValue()) {
297 String errorStr = Bytes.toString(encodedRegionName) + " family " +
298 Bytes.toString(e.getKey()) + " acquired edits out of order current memstore seq=" +
299 currentId + ", previous oldest unflushed id=" + e.getValue();
300 LOG.error(errorStr);
301 Runtime.getRuntime().halt(1);
302 }
303 }
304 }
305 }
306
307
308
309
310
311
312
313
314 boolean areAllLower(Map<byte[], Long> sequenceids) {
315 Map<byte[], Long> flushing = null;
316 Map<byte[], Long> unflushed = null;
317 synchronized (this.tieLock) {
318
319
320 flushing = flattenToLowestSequenceId(this.flushingSequenceIds);
321 unflushed = flattenToLowestSequenceId(this.lowestUnflushedSequenceIds);
322 }
323 for (Map.Entry<byte[], Long> e : sequenceids.entrySet()) {
324 long oldestFlushing = Long.MAX_VALUE;
325 long oldestUnflushed = Long.MAX_VALUE;
326 if (flushing != null) {
327 if (flushing.containsKey(e.getKey())) oldestFlushing = flushing.get(e.getKey());
328 }
329 if (unflushed != null) {
330 if (unflushed.containsKey(e.getKey())) oldestUnflushed = unflushed.get(e.getKey());
331 }
332 long min = Math.min(oldestFlushing, oldestUnflushed);
333 if (min <= e.getValue()) return false;
334 }
335 return true;
336 }
337
338
339
340
341
342
343
344
345
346 byte[][] findLower(Map<byte[], Long> sequenceids) {
347 List<byte[]> toFlush = null;
348
349 synchronized (tieLock) {
350 for (Map.Entry<byte[], Long> e: sequenceids.entrySet()) {
351 Map<byte[], Long> m = this.lowestUnflushedSequenceIds.get(e.getKey());
352 if (m == null) continue;
353
354 long lowest = getLowestSequenceId(m);
355 if (lowest != HConstants.NO_SEQNUM && lowest <= e.getValue()) {
356 if (toFlush == null) toFlush = new ArrayList<byte[]>();
357 toFlush.add(e.getKey());
358 }
359 }
360 }
361 return toFlush == null? null: toFlush.toArray(new byte[][] { HConstants.EMPTY_BYTE_ARRAY });
362 }
363 }