1   
2   
3   
4   
5   
6   
7   
8   
9   
10  
11  
12  
13  
14  
15  
16  
17  
18  
19  package org.apache.hadoop.hbase.wal;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.UUID;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.hadoop.hbase.util.ByteStringer;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
40  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
44  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
45  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
46  import org.apache.hadoop.hbase.regionserver.SequenceId;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.protobuf.ByteString;
52  
53  
54  
55  
56  
57  import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
58  import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
59  
60  
61  
62  
63  
64  
65  
66  
67  
68  
69  
70  
71  
72  
73  
74  
75  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
76  public class WALKey implements SequenceId, Comparable<WALKey> {
77    public static final Log LOG = LogFactory.getLog(WALKey.class);
78  
79    
80    
81    
82    @InterfaceAudience.Private
83    protected enum Version {
84      UNVERSIONED(0),
85      
86      INITIAL(-1),
87      
88      
89      COMPRESSED(-2);
90  
91      public final int code;
92      static final Version[] byCode;
93      static {
94        byCode = Version.values();
95        for (int i = 0; i < byCode.length; i++) {
96          if (byCode[i].code != -1 * i) {
97            throw new AssertionError("Values in this enum should be descending by one");
98          }
99        }
100     }
101 
102     Version(int code) {
103       this.code = code;
104     }
105 
106     public boolean atLeast(Version other) {
107       return code <= other.code;
108     }
109 
110     public static Version fromCode(int code) {
111       return byCode[code * -1];
112     }
113   }
114 
115   
116 
117 
118 
119   private static final String PREFIX_CLUSTER_KEY = ".";
120 
121 
122   
123   @InterfaceAudience.Private
124   protected static final Version VERSION = Version.COMPRESSED;
125 
126   
127   public static final long NO_SEQUENCE_ID = -1;
128 
129 
130   
131   @InterfaceAudience.Private
132   protected byte [] encodedRegionName;
133   
134   @InterfaceAudience.Private
135   protected TableName tablename;
136   
137   @InterfaceAudience.Private
138   protected long logSeqNum;
139   private long origLogSeqNum = 0;
140   private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
141   
142   
143   @InterfaceAudience.Private
144   protected long writeTime;
145 
146   
147   
148   @InterfaceAudience.Private
149   protected List<UUID> clusterIds;
150 
151   private NavigableMap<byte[], Integer> scopes;
152 
153   private long nonceGroup = HConstants.NO_NONCE;
154   private long nonce = HConstants.NO_NONCE;
155   static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
156 
157   
158   @InterfaceAudience.Private
159   protected CompressionContext compressionContext;
160 
161   public WALKey() {
162     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
163         new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
164   }
165 
166   @VisibleForTesting
167   public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
168       final long now, UUID clusterId) {
169     List<UUID> clusterIds = new ArrayList<UUID>();
170     clusterIds.add(clusterId);
171     init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
172         HConstants.NO_NONCE, HConstants.NO_NONCE);
173   }
174 
175   public WALKey(final byte[] encodedRegionName, final TableName tablename) {
176     this(encodedRegionName, tablename, System.currentTimeMillis());
177   }
178 
179   public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
180     init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
181         EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
182   }
183 
184   
185 
186 
187 
188 
189 
190 
191 
192 
193 
194 
195 
196 
197   public WALKey(final byte [] encodedRegionName, final TableName tablename,
198       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
199     init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
200   }
201 
202   
203 
204 
205 
206 
207 
208 
209 
210 
211 
212 
213 
214 
215   public WALKey(final byte [] encodedRegionName, final TableName tablename,
216       final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
217     init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
218       nonceGroup, nonce);
219   }
220 
221   
222 
223 
224 
225 
226 
227 
228 
229 
230 
231 
232 
233   public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
234       long nonceGroup, long nonce) {
235     init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
236       EMPTY_UUIDS, nonceGroup, nonce);
237   }
238 
239   @InterfaceAudience.Private
240   protected void init(final byte [] encodedRegionName, final TableName tablename,
241       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
242     this.logSeqNum = logSeqNum;
243     this.writeTime = now;
244     this.clusterIds = clusterIds;
245     this.encodedRegionName = encodedRegionName;
246     this.tablename = tablename;
247     this.nonceGroup = nonceGroup;
248     this.nonce = nonce;
249   }
250 
251   
252 
253 
254   public void setCompressionContext(CompressionContext compressionContext) {
255     this.compressionContext = compressionContext;
256   }
257 
258   
259   public byte [] getEncodedRegionName() {
260     return encodedRegionName;
261   }
262 
263   
264   public TableName getTablename() {
265     return tablename;
266   }
267 
268   
269   public long getLogSeqNum() {
270     return this.logSeqNum;
271   }
272 
273   
274 
275 
276 
277 
278 
279   @InterfaceAudience.Private
280   public void setLogSeqNum(final long sequence) {
281     this.logSeqNum = sequence;
282     this.seqNumAssignedLatch.countDown();
283   }
284 
285   
286 
287 
288 
289   public void setOrigLogSeqNum(final long seqId) {
290     this.origLogSeqNum = seqId;
291   }
292 
293   
294 
295 
296 
297   public long getOrigLogSeqNum() {
298     return this.origLogSeqNum;
299   }
300 
301   @Override
302   public long getSequenceId() throws IOException {
303     return getSequenceId(-1);
304   }
305 
306   
307 
308 
309 
310 
311 
312   public long getSequenceId(final long maxWaitForSeqId) throws IOException {
313     
314     
315     
316     try {
317       if (maxWaitForSeqId < 0) {
318         this.seqNumAssignedLatch.await();
319       } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) {
320         throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId +
321           "ms; WAL system stuck or has gone away?");
322       }
323     } catch (InterruptedException ie) {
324       LOG.warn("Thread interrupted waiting for next log sequence number");
325       InterruptedIOException iie = new InterruptedIOException();
326       iie.initCause(ie);
327       throw iie;
328     }
329     return this.logSeqNum;
330   }
331 
332   
333 
334 
335   public long getWriteTime() {
336     return this.writeTime;
337   }
338 
339   public NavigableMap<byte[], Integer> getScopes() {
340     return scopes;
341   }
342 
343   
344   public long getNonceGroup() {
345     return nonceGroup;
346   }
347 
348   
349   public long getNonce() {
350     return nonce;
351   }
352 
353   public void setScopes(NavigableMap<byte[], Integer> scopes) {
354     this.scopes = scopes;
355   }
356 
357   public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
358     if (scopes != null) {
359       Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
360           .iterator();
361       while (iterator.hasNext()) {
362         Map.Entry<byte[], Integer> scope = iterator.next();
363         String key = Bytes.toString(scope.getKey());
364         if (key.startsWith(PREFIX_CLUSTER_KEY)) {
365           addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
366               .length())));
367           iterator.remove();
368         }
369       }
370       if (scopes.size() > 0) {
371         this.scopes = scopes;
372       }
373     }
374   }
375 
376   
377 
378 
379   public void addClusterId(UUID clusterId) {
380     if (!clusterIds.contains(clusterId)) {
381       clusterIds.add(clusterId);
382     }
383   }
384 
385   
386 
387 
388   public List<UUID> getClusterIds() {
389     return clusterIds;
390   }
391 
392   
393 
394 
395 
396   public UUID getOriginatingClusterId(){
397     return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
398   }
399 
400   @Override
401   public String toString() {
402     return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
403       logSeqNum;
404   }
405 
406   
407 
408 
409 
410 
411 
412 
413   public Map<String, Object> toStringMap() {
414     Map<String, Object> stringMap = new HashMap<String, Object>();
415     stringMap.put("table", tablename);
416     stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
417     stringMap.put("sequence", logSeqNum);
418     return stringMap;
419   }
420 
421   @Override
422   public boolean equals(Object obj) {
423     if (this == obj) {
424       return true;
425     }
426     if (obj == null || getClass() != obj.getClass()) {
427       return false;
428     }
429     return compareTo((WALKey)obj) == 0;
430   }
431 
432   @Override
433   public int hashCode() {
434     int result = Bytes.hashCode(this.encodedRegionName);
435     result ^= this.logSeqNum;
436     result ^= this.writeTime;
437     return result;
438   }
439 
440   @Override
441   public int compareTo(WALKey o) {
442     int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
443     if (result == 0) {
444       if (this.logSeqNum < o.logSeqNum) {
445         result = -1;
446       } else if (this.logSeqNum  > o.logSeqNum) {
447         result = 1;
448       }
449       if (result == 0) {
450         if (this.writeTime < o.writeTime) {
451           result = -1;
452         } else if (this.writeTime > o.writeTime) {
453           return 1;
454         }
455       }
456     }
457     
458     return result;
459   }
460 
461   
462 
463 
464 
465 
466 
467   void internTableName(TableName tablename) {
468     
469     
470     assert tablename.equals(this.tablename);
471     this.tablename = tablename;
472   }
473 
474   
475 
476 
477 
478 
479 
480   void internEncodedRegionName(byte []encodedRegionName) {
481     
482     
483     assert Bytes.equals(this.encodedRegionName, encodedRegionName);
484     this.encodedRegionName = encodedRegionName;
485   }
486 
487   public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
488   throws IOException {
489     org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
490     if (compressionContext == null) {
491       builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
492       builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
493     } else {
494       builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
495         compressionContext.regionDict));
496       builder.setTableName(compressor.compress(this.tablename.getName(),
497         compressionContext.tableDict));
498     }
499     builder.setLogSequenceNumber(this.logSeqNum);
500     builder.setWriteTime(writeTime);
501     if(this.origLogSeqNum > 0) {
502       builder.setOrigSequenceNumber(this.origLogSeqNum);
503     }
504     if (this.nonce != HConstants.NO_NONCE) {
505       builder.setNonce(nonce);
506     }
507     if (this.nonceGroup != HConstants.NO_NONCE) {
508       builder.setNonceGroup(nonceGroup);
509     }
510     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
511     for (UUID clusterId : clusterIds) {
512       uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
513       uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
514       builder.addClusterIds(uuidBuilder.build());
515     }
516     if (scopes != null) {
517       for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
518         ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
519             : compressor.compress(e.getKey(), compressionContext.familyDict);
520         builder.addScopes(FamilyScope.newBuilder()
521             .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
522       }
523     }
524     return builder;
525   }
526 
527   public void readFieldsFromPb(
528       org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
529     if (this.compressionContext != null) {
530       this.encodedRegionName = uncompressor.uncompress(
531           walKey.getEncodedRegionName(), compressionContext.regionDict);
532       byte[] tablenameBytes = uncompressor.uncompress(
533           walKey.getTableName(), compressionContext.tableDict);
534       this.tablename = TableName.valueOf(tablenameBytes);
535     } else {
536       this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
537       this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
538     }
539     clusterIds.clear();
540     if (walKey.hasClusterId()) {
541       
542       
543       clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
544           .getLeastSigBits()));
545     }
546     for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
547       clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
548     }
549     if (walKey.hasNonceGroup()) {
550       this.nonceGroup = walKey.getNonceGroup();
551     }
552     if (walKey.hasNonce()) {
553       this.nonce = walKey.getNonce();
554     }
555     this.scopes = null;
556     if (walKey.getScopesCount() > 0) {
557       this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
558       for (FamilyScope scope : walKey.getScopesList()) {
559         byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
560           uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
561         this.scopes.put(family, scope.getScopeType().getNumber());
562       }
563     }
564     this.logSeqNum = walKey.getLogSequenceNumber();
565     this.writeTime = walKey.getWriteTime();
566     if(walKey.hasOrigSequenceNumber()) {
567       this.origLogSeqNum = walKey.getOrigSequenceNumber();
568     }
569   }
570 }