1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.NavigableMap;
30 import java.util.TreeMap;
31 import java.util.UUID;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.hadoop.hbase.util.ByteStringer;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38 import org.apache.hadoop.hbase.classification.InterfaceAudience;
39 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
40 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
41 import org.apache.hadoop.hbase.HConstants;
42 import org.apache.hadoop.hbase.TableName;
43 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
44 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
45 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
46 import org.apache.hadoop.hbase.regionserver.SequenceId;
47 import org.apache.hadoop.hbase.util.Bytes;
48 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49
50 import com.google.common.annotations.VisibleForTesting;
51 import com.google.protobuf.ByteString;
52
53
54
55
56
57 import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
58 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
76 public class WALKey implements SequenceId, Comparable<WALKey> {
77 public static final Log LOG = LogFactory.getLog(WALKey.class);
78
79
80
81
82 @InterfaceAudience.Private
83 protected enum Version {
84 UNVERSIONED(0),
85
86 INITIAL(-1),
87
88
89 COMPRESSED(-2);
90
91 public final int code;
92 static final Version[] byCode;
93 static {
94 byCode = Version.values();
95 for (int i = 0; i < byCode.length; i++) {
96 if (byCode[i].code != -1 * i) {
97 throw new AssertionError("Values in this enum should be descending by one");
98 }
99 }
100 }
101
102 Version(int code) {
103 this.code = code;
104 }
105
106 public boolean atLeast(Version other) {
107 return code <= other.code;
108 }
109
110 public static Version fromCode(int code) {
111 return byCode[code * -1];
112 }
113 }
114
115
116
117
118
119 private static final String PREFIX_CLUSTER_KEY = ".";
120
121
122
123 @InterfaceAudience.Private
124 protected static final Version VERSION = Version.COMPRESSED;
125
126
127 public static final long NO_SEQUENCE_ID = -1;
128
129
130
131 @InterfaceAudience.Private
132 protected byte [] encodedRegionName;
133
134 @InterfaceAudience.Private
135 protected TableName tablename;
136
137 @InterfaceAudience.Private
138 protected long logSeqNum;
139 private long origLogSeqNum = 0;
140 private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
141
142
143 @InterfaceAudience.Private
144 protected long writeTime;
145
146
147
148 @InterfaceAudience.Private
149 protected List<UUID> clusterIds;
150
151 private NavigableMap<byte[], Integer> scopes;
152
153 private long nonceGroup = HConstants.NO_NONCE;
154 private long nonce = HConstants.NO_NONCE;
155 static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
156
157
158 @InterfaceAudience.Private
159 protected CompressionContext compressionContext;
160
161 public WALKey() {
162 init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
163 new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
164 }
165
166 @VisibleForTesting
167 public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
168 final long now, UUID clusterId) {
169 List<UUID> clusterIds = new ArrayList<UUID>();
170 clusterIds.add(clusterId);
171 init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
172 HConstants.NO_NONCE, HConstants.NO_NONCE);
173 }
174
175 public WALKey(final byte[] encodedRegionName, final TableName tablename) {
176 this(encodedRegionName, tablename, System.currentTimeMillis());
177 }
178
179 public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
180 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
181 EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
182 }
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197 public WALKey(final byte [] encodedRegionName, final TableName tablename,
198 long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
199 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
200 }
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215 public WALKey(final byte [] encodedRegionName, final TableName tablename,
216 final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
217 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
218 nonceGroup, nonce);
219 }
220
221
222
223
224
225
226
227
228
229
230
231
232
233 public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
234 long nonceGroup, long nonce) {
235 init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
236 EMPTY_UUIDS, nonceGroup, nonce);
237 }
238
239 @InterfaceAudience.Private
240 protected void init(final byte [] encodedRegionName, final TableName tablename,
241 long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
242 this.logSeqNum = logSeqNum;
243 this.writeTime = now;
244 this.clusterIds = clusterIds;
245 this.encodedRegionName = encodedRegionName;
246 this.tablename = tablename;
247 this.nonceGroup = nonceGroup;
248 this.nonce = nonce;
249 }
250
251
252
253
254 public void setCompressionContext(CompressionContext compressionContext) {
255 this.compressionContext = compressionContext;
256 }
257
258
259 public byte [] getEncodedRegionName() {
260 return encodedRegionName;
261 }
262
263
264 public TableName getTablename() {
265 return tablename;
266 }
267
268
269 public long getLogSeqNum() {
270 return this.logSeqNum;
271 }
272
273
274
275
276
277
278
279 @InterfaceAudience.Private
280 public void setLogSeqNum(final long sequence) {
281 this.logSeqNum = sequence;
282 this.seqNumAssignedLatch.countDown();
283 }
284
285
286
287
288
289 public void setOrigLogSeqNum(final long seqId) {
290 this.origLogSeqNum = seqId;
291 }
292
293
294
295
296
297 public long getOrigLogSeqNum() {
298 return this.origLogSeqNum;
299 }
300
301 @Override
302 public long getSequenceId() throws IOException {
303 return getSequenceId(-1);
304 }
305
306
307
308
309
310
311
312 public long getSequenceId(final long maxWaitForSeqId) throws IOException {
313
314
315
316 try {
317 if (maxWaitForSeqId < 0) {
318 this.seqNumAssignedLatch.await();
319 } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) {
320 throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId +
321 "ms; WAL system stuck or has gone away?");
322 }
323 } catch (InterruptedException ie) {
324 LOG.warn("Thread interrupted waiting for next log sequence number");
325 InterruptedIOException iie = new InterruptedIOException();
326 iie.initCause(ie);
327 throw iie;
328 }
329 return this.logSeqNum;
330 }
331
332
333
334
335 public long getWriteTime() {
336 return this.writeTime;
337 }
338
339 public NavigableMap<byte[], Integer> getScopes() {
340 return scopes;
341 }
342
343
344 public long getNonceGroup() {
345 return nonceGroup;
346 }
347
348
349 public long getNonce() {
350 return nonce;
351 }
352
353 public void setScopes(NavigableMap<byte[], Integer> scopes) {
354 this.scopes = scopes;
355 }
356
357 public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
358 if (scopes != null) {
359 Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
360 .iterator();
361 while (iterator.hasNext()) {
362 Map.Entry<byte[], Integer> scope = iterator.next();
363 String key = Bytes.toString(scope.getKey());
364 if (key.startsWith(PREFIX_CLUSTER_KEY)) {
365 addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
366 .length())));
367 iterator.remove();
368 }
369 }
370 if (scopes.size() > 0) {
371 this.scopes = scopes;
372 }
373 }
374 }
375
376
377
378
379 public void addClusterId(UUID clusterId) {
380 if (!clusterIds.contains(clusterId)) {
381 clusterIds.add(clusterId);
382 }
383 }
384
385
386
387
388 public List<UUID> getClusterIds() {
389 return clusterIds;
390 }
391
392
393
394
395
396 public UUID getOriginatingClusterId(){
397 return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
398 }
399
400 @Override
401 public String toString() {
402 return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
403 logSeqNum;
404 }
405
406
407
408
409
410
411
412
413 public Map<String, Object> toStringMap() {
414 Map<String, Object> stringMap = new HashMap<String, Object>();
415 stringMap.put("table", tablename);
416 stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
417 stringMap.put("sequence", logSeqNum);
418 return stringMap;
419 }
420
421 @Override
422 public boolean equals(Object obj) {
423 if (this == obj) {
424 return true;
425 }
426 if (obj == null || getClass() != obj.getClass()) {
427 return false;
428 }
429 return compareTo((WALKey)obj) == 0;
430 }
431
432 @Override
433 public int hashCode() {
434 int result = Bytes.hashCode(this.encodedRegionName);
435 result ^= this.logSeqNum;
436 result ^= this.writeTime;
437 return result;
438 }
439
440 @Override
441 public int compareTo(WALKey o) {
442 int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
443 if (result == 0) {
444 if (this.logSeqNum < o.logSeqNum) {
445 result = -1;
446 } else if (this.logSeqNum > o.logSeqNum) {
447 result = 1;
448 }
449 if (result == 0) {
450 if (this.writeTime < o.writeTime) {
451 result = -1;
452 } else if (this.writeTime > o.writeTime) {
453 return 1;
454 }
455 }
456 }
457
458 return result;
459 }
460
461
462
463
464
465
466
467 void internTableName(TableName tablename) {
468
469
470 assert tablename.equals(this.tablename);
471 this.tablename = tablename;
472 }
473
474
475
476
477
478
479
480 void internEncodedRegionName(byte []encodedRegionName) {
481
482
483 assert Bytes.equals(this.encodedRegionName, encodedRegionName);
484 this.encodedRegionName = encodedRegionName;
485 }
486
487 public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
488 throws IOException {
489 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
490 if (compressionContext == null) {
491 builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
492 builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
493 } else {
494 builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
495 compressionContext.regionDict));
496 builder.setTableName(compressor.compress(this.tablename.getName(),
497 compressionContext.tableDict));
498 }
499 builder.setLogSequenceNumber(this.logSeqNum);
500 builder.setWriteTime(writeTime);
501 if(this.origLogSeqNum > 0) {
502 builder.setOrigSequenceNumber(this.origLogSeqNum);
503 }
504 if (this.nonce != HConstants.NO_NONCE) {
505 builder.setNonce(nonce);
506 }
507 if (this.nonceGroup != HConstants.NO_NONCE) {
508 builder.setNonceGroup(nonceGroup);
509 }
510 HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
511 for (UUID clusterId : clusterIds) {
512 uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
513 uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
514 builder.addClusterIds(uuidBuilder.build());
515 }
516 if (scopes != null) {
517 for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
518 ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
519 : compressor.compress(e.getKey(), compressionContext.familyDict);
520 builder.addScopes(FamilyScope.newBuilder()
521 .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
522 }
523 }
524 return builder;
525 }
526
527 public void readFieldsFromPb(
528 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
529 if (this.compressionContext != null) {
530 this.encodedRegionName = uncompressor.uncompress(
531 walKey.getEncodedRegionName(), compressionContext.regionDict);
532 byte[] tablenameBytes = uncompressor.uncompress(
533 walKey.getTableName(), compressionContext.tableDict);
534 this.tablename = TableName.valueOf(tablenameBytes);
535 } else {
536 this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
537 this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
538 }
539 clusterIds.clear();
540 if (walKey.hasClusterId()) {
541
542
543 clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
544 .getLeastSigBits()));
545 }
546 for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
547 clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
548 }
549 if (walKey.hasNonceGroup()) {
550 this.nonceGroup = walKey.getNonceGroup();
551 }
552 if (walKey.hasNonce()) {
553 this.nonce = walKey.getNonce();
554 }
555 this.scopes = null;
556 if (walKey.getScopesCount() > 0) {
557 this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
558 for (FamilyScope scope : walKey.getScopesList()) {
559 byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
560 uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
561 this.scopes.put(family, scope.getScopeType().getNumber());
562 }
563 }
564 this.logSeqNum = walKey.getLogSequenceNumber();
565 this.writeTime = walKey.getWriteTime();
566 if(walKey.hasOrigSequenceNumber()) {
567 this.origLogSeqNum = walKey.getOrigSequenceNumber();
568 }
569 }
570 }