1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.wal;
20
21 import java.io.IOException;
22 import java.io.InterruptedIOException;
23 import java.util.ArrayList;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.NavigableMap;
30 import java.util.TreeMap;
31 import java.util.UUID;
32 import java.util.concurrent.CountDownLatch;
33 import java.util.concurrent.TimeUnit;
34
35 import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
36 import org.apache.hadoop.hbase.util.ByteStringer;
37 import org.apache.commons.logging.Log;
38 import org.apache.commons.logging.LogFactory;
39 import org.apache.hadoop.hbase.classification.InterfaceAudience;
40 import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
41 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
42 import org.apache.hadoop.hbase.HConstants;
43 import org.apache.hadoop.hbase.TableName;
44 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
45 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
46 import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
47 import org.apache.hadoop.hbase.regionserver.SequenceId;
48 import org.apache.hadoop.hbase.util.Bytes;
49 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
50
51 import com.google.common.annotations.VisibleForTesting;
52 import com.google.protobuf.ByteString;
53
54
55
56
57 import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
58 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
79 public class WALKey implements SequenceId, Comparable<WALKey> {
80 private static final Log LOG = LogFactory.getLog(WALKey.class);
81
82 @InterfaceAudience.Private
83 public MultiVersionConcurrencyControl getMvcc() {
84 return mvcc;
85 }
86
87
88
89
90
91
92
93
94
95 @InterfaceAudience.Private
96 public MultiVersionConcurrencyControl.WriteEntry getWriteEntry() throws InterruptedIOException {
97 try {
98 this.seqNumAssignedLatch.await();
99 } catch (InterruptedException ie) {
100
101 MultiVersionConcurrencyControl mvcc = getMvcc();
102 LOG.debug("mvcc=" + mvcc + ", writeEntry=" + this.writeEntry);
103 if (mvcc != null) {
104 if (this.writeEntry != null) {
105 mvcc.complete(this.writeEntry);
106 }
107 }
108 InterruptedIOException iie = new InterruptedIOException();
109 iie.initCause(ie);
110 throw iie;
111 }
112 return this.writeEntry;
113 }
114
115 @InterfaceAudience.Private
116 public void setWriteEntry(MultiVersionConcurrencyControl.WriteEntry writeEntry) {
117 this.writeEntry = writeEntry;
118 this.seqNumAssignedLatch.countDown();
119 }
120
121
122
123
124 @InterfaceAudience.Private
125 protected enum Version {
126 UNVERSIONED(0),
127
128 INITIAL(-1),
129
130
131 COMPRESSED(-2);
132
133 public final int code;
134 static final Version[] byCode;
135 static {
136 byCode = Version.values();
137 for (int i = 0; i < byCode.length; i++) {
138 if (byCode[i].code != -1 * i) {
139 throw new AssertionError("Values in this enum should be descending by one");
140 }
141 }
142 }
143
144 Version(int code) {
145 this.code = code;
146 }
147
148 public boolean atLeast(Version other) {
149 return code <= other.code;
150 }
151
152 public static Version fromCode(int code) {
153 return byCode[code * -1];
154 }
155 }
156
157
158
159
160
161 private static final String PREFIX_CLUSTER_KEY = ".";
162
163
164
165 @InterfaceAudience.Private
166 protected static final Version VERSION = Version.COMPRESSED;
167
168
169 public static final long NO_SEQUENCE_ID = -1;
170
171
172
173 @InterfaceAudience.Private
174 protected byte [] encodedRegionName;
175
176 @InterfaceAudience.Private
177 protected TableName tablename;
178
179 @InterfaceAudience.Private
180 protected long logSeqNum;
181 private long origLogSeqNum = 0;
182 private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
183
184
185 @InterfaceAudience.Private
186 protected long writeTime;
187
188
189
190 @InterfaceAudience.Private
191 protected List<UUID> clusterIds;
192
193 private NavigableMap<byte[], Integer> scopes;
194
195 private long nonceGroup = HConstants.NO_NONCE;
196 private long nonce = HConstants.NO_NONCE;
197 private MultiVersionConcurrencyControl mvcc;
198 private MultiVersionConcurrencyControl.WriteEntry writeEntry;
199 public static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
200
201
202 @InterfaceAudience.Private
203 protected CompressionContext compressionContext;
204
205 public WALKey() {
206 init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
207 new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE, null);
208 }
209
210 @VisibleForTesting
211 public WALKey(final byte[] encodedRegionName, final TableName tablename,
212 long logSeqNum,
213 final long now, UUID clusterId) {
214 List<UUID> clusterIds = new ArrayList<UUID>();
215 clusterIds.add(clusterId);
216 init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
217 HConstants.NO_NONCE, HConstants.NO_NONCE, null);
218 }
219
220 public WALKey(final byte[] encodedRegionName, final TableName tablename) {
221 this(encodedRegionName, tablename, System.currentTimeMillis());
222 }
223
224 public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
225 init(encodedRegionName,
226 tablename,
227 NO_SEQUENCE_ID,
228 now,
229 EMPTY_UUIDS,
230 HConstants.NO_NONCE,
231 HConstants.NO_NONCE,
232 null);
233 }
234
235 public WALKey(final byte[] encodedRegionName,
236 final TableName tablename,
237 final long now,
238 MultiVersionConcurrencyControl mvcc) {
239 init(encodedRegionName,
240 tablename,
241 NO_SEQUENCE_ID,
242 now,
243 EMPTY_UUIDS,
244 HConstants.NO_NONCE,
245 HConstants.NO_NONCE,
246 mvcc);
247 }
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262 public WALKey(final byte[] encodedRegionName,
263 final TableName tablename,
264 long logSeqNum,
265 final long now,
266 List<UUID> clusterIds,
267 long nonceGroup,
268 long nonce,
269 MultiVersionConcurrencyControl mvcc) {
270 init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce, mvcc);
271 }
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287 public WALKey(final byte[] encodedRegionName, final TableName tablename,
288 final long now, List<UUID> clusterIds, long nonceGroup,
289 final long nonce, final MultiVersionConcurrencyControl mvcc) {
290 init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds, nonceGroup, nonce, mvcc);
291 }
292
293
294
295
296
297
298
299
300
301
302
303
304
305 public WALKey(final byte[] encodedRegionName,
306 final TableName tablename,
307 long logSeqNum,
308 long nonceGroup,
309 long nonce,
310 final MultiVersionConcurrencyControl mvcc) {
311 init(encodedRegionName,
312 tablename,
313 logSeqNum,
314 EnvironmentEdgeManager.currentTime(),
315 EMPTY_UUIDS,
316 nonceGroup,
317 nonce,
318 mvcc);
319 }
320
321 @InterfaceAudience.Private
322 protected void init(final byte[] encodedRegionName,
323 final TableName tablename,
324 long logSeqNum,
325 final long now,
326 List<UUID> clusterIds,
327 long nonceGroup,
328 long nonce,
329 MultiVersionConcurrencyControl mvcc) {
330 this.logSeqNum = logSeqNum;
331 this.writeTime = now;
332 this.clusterIds = clusterIds;
333 this.encodedRegionName = encodedRegionName;
334 this.tablename = tablename;
335 this.nonceGroup = nonceGroup;
336 this.nonce = nonce;
337 this.mvcc = mvcc;
338 }
339
340
341
342
343 public void setCompressionContext(CompressionContext compressionContext) {
344 this.compressionContext = compressionContext;
345 }
346
347
348 public byte [] getEncodedRegionName() {
349 return encodedRegionName;
350 }
351
352
353 public TableName getTablename() {
354 return tablename;
355 }
356
357
358 public long getLogSeqNum() {
359 return this.logSeqNum;
360 }
361
362
363
364
365
366
367 @InterfaceAudience.Private
368 public void setLogSeqNum(final long sequence) {
369 this.logSeqNum = sequence;
370
371 }
372
373
374
375
376
377 public void setOrigLogSeqNum(final long seqId) {
378 this.origLogSeqNum = seqId;
379 }
380
381
382
383
384
385 public long getOrigLogSeqNum() {
386 return this.origLogSeqNum;
387 }
388
389
390
391
392
393
394 @Override
395 public long getSequenceId() throws IOException {
396 return getSequenceId(-1);
397 }
398
399
400
401
402
403
404
405 public long getSequenceId(final long maxWaitForSeqId) throws IOException {
406
407
408
409
410
411
412
413 try {
414 if (maxWaitForSeqId < 0) {
415 this.seqNumAssignedLatch.await();
416 } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) {
417 throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId +
418 "ms; WAL system stuck or has gone away?");
419 }
420 } catch (InterruptedException ie) {
421 LOG.warn("Thread interrupted waiting for next log sequence number");
422 InterruptedIOException iie = new InterruptedIOException();
423 iie.initCause(ie);
424 throw iie;
425 }
426 return this.logSeqNum;
427 }
428
429
430
431
432 public long getWriteTime() {
433 return this.writeTime;
434 }
435
436 public NavigableMap<byte[], Integer> getScopes() {
437 return scopes;
438 }
439
440
441 public long getNonceGroup() {
442 return nonceGroup;
443 }
444
445
446 public long getNonce() {
447 return nonce;
448 }
449
450 public void setScopes(NavigableMap<byte[], Integer> scopes) {
451 this.scopes = scopes;
452 }
453
454 public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
455 if (scopes != null) {
456 Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
457 .iterator();
458 while (iterator.hasNext()) {
459 Map.Entry<byte[], Integer> scope = iterator.next();
460 String key = Bytes.toString(scope.getKey());
461 if (key.startsWith(PREFIX_CLUSTER_KEY)) {
462 addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
463 .length())));
464 iterator.remove();
465 }
466 }
467 if (scopes.size() > 0) {
468 this.scopes = scopes;
469 }
470 }
471 }
472
473
474
475
476 public void addClusterId(UUID clusterId) {
477 if (!clusterIds.contains(clusterId)) {
478 clusterIds.add(clusterId);
479 }
480 }
481
482
483
484
485 public List<UUID> getClusterIds() {
486 return clusterIds;
487 }
488
489
490
491
492
493 public UUID getOriginatingClusterId(){
494 return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
495 }
496
497 @Override
498 public String toString() {
499 return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
500 logSeqNum;
501 }
502
503
504
505
506
507
508
509
510 public Map<String, Object> toStringMap() {
511 Map<String, Object> stringMap = new HashMap<String, Object>();
512 stringMap.put("table", tablename);
513 stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
514 stringMap.put("sequence", logSeqNum);
515 return stringMap;
516 }
517
518 @Override
519 public boolean equals(Object obj) {
520 if (this == obj) {
521 return true;
522 }
523 if (obj == null || getClass() != obj.getClass()) {
524 return false;
525 }
526 return compareTo((WALKey)obj) == 0;
527 }
528
529 @Override
530 public int hashCode() {
531 int result = Bytes.hashCode(this.encodedRegionName);
532 result ^= this.logSeqNum;
533 result ^= this.writeTime;
534 return result;
535 }
536
537 @Override
538 public int compareTo(WALKey o) {
539 int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
540 if (result == 0) {
541 if (this.logSeqNum < o.logSeqNum) {
542 result = -1;
543 } else if (this.logSeqNum > o.logSeqNum) {
544 result = 1;
545 }
546 if (result == 0) {
547 if (this.writeTime < o.writeTime) {
548 result = -1;
549 } else if (this.writeTime > o.writeTime) {
550 return 1;
551 }
552 }
553 }
554
555 return result;
556 }
557
558
559
560
561
562
563
564 void internTableName(TableName tablename) {
565
566
567 assert tablename.equals(this.tablename);
568 this.tablename = tablename;
569 }
570
571
572
573
574
575
576
577 void internEncodedRegionName(byte []encodedRegionName) {
578
579
580 assert Bytes.equals(this.encodedRegionName, encodedRegionName);
581 this.encodedRegionName = encodedRegionName;
582 }
583
584 public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(
585 WALCellCodec.ByteStringCompressor compressor) throws IOException {
586 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder =
587 org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
588 if (compressionContext == null) {
589 builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
590 builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
591 } else {
592 builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
593 compressionContext.regionDict));
594 builder.setTableName(compressor.compress(this.tablename.getName(),
595 compressionContext.tableDict));
596 }
597 builder.setLogSequenceNumber(this.logSeqNum);
598 builder.setWriteTime(writeTime);
599 if (this.origLogSeqNum > 0) {
600 builder.setOrigSequenceNumber(this.origLogSeqNum);
601 }
602 if (this.nonce != HConstants.NO_NONCE) {
603 builder.setNonce(nonce);
604 }
605 if (this.nonceGroup != HConstants.NO_NONCE) {
606 builder.setNonceGroup(nonceGroup);
607 }
608 HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
609 for (UUID clusterId : clusterIds) {
610 uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
611 uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
612 builder.addClusterIds(uuidBuilder.build());
613 }
614 if (scopes != null) {
615 for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
616 ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
617 : compressor.compress(e.getKey(), compressionContext.familyDict);
618 builder.addScopes(FamilyScope.newBuilder()
619 .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
620 }
621 }
622 return builder;
623 }
624
625 public void readFieldsFromPb(org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey,
626 WALCellCodec.ByteStringUncompressor uncompressor)
627 throws IOException {
628 if (this.compressionContext != null) {
629 this.encodedRegionName = uncompressor.uncompress(
630 walKey.getEncodedRegionName(), compressionContext.regionDict);
631 byte[] tablenameBytes = uncompressor.uncompress(
632 walKey.getTableName(), compressionContext.tableDict);
633 this.tablename = TableName.valueOf(tablenameBytes);
634 } else {
635 this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
636 this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
637 }
638 clusterIds.clear();
639 if (walKey.hasClusterId()) {
640
641
642 clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
643 .getLeastSigBits()));
644 }
645 for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
646 clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
647 }
648 if (walKey.hasNonceGroup()) {
649 this.nonceGroup = walKey.getNonceGroup();
650 }
651 if (walKey.hasNonce()) {
652 this.nonce = walKey.getNonce();
653 }
654 this.scopes = null;
655 if (walKey.getScopesCount() > 0) {
656 this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
657 for (FamilyScope scope : walKey.getScopesList()) {
658 byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
659 uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
660 this.scopes.put(family, scope.getScopeType().getNumber());
661 }
662 }
663 this.logSeqNum = walKey.getLogSequenceNumber();
664 this.writeTime = walKey.getWriteTime();
665 if(walKey.hasOrigSequenceNumber()) {
666 this.origLogSeqNum = walKey.getOrigSequenceNumber();
667 }
668 }
669 }