View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.wal;
20  
21  import java.io.IOException;
22  import java.io.InterruptedIOException;
23  import java.util.ArrayList;
24  import java.util.Collections;
25  import java.util.HashMap;
26  import java.util.Iterator;
27  import java.util.List;
28  import java.util.Map;
29  import java.util.NavigableMap;
30  import java.util.TreeMap;
31  import java.util.UUID;
32  import java.util.concurrent.CountDownLatch;
33  import java.util.concurrent.TimeUnit;
34  
35  import org.apache.hadoop.hbase.util.ByteStringer;
36  import org.apache.commons.logging.Log;
37  import org.apache.commons.logging.LogFactory;
38  import org.apache.hadoop.hbase.classification.InterfaceAudience;
39  import org.apache.hadoop.hbase.exceptions.TimeoutIOException;
40  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
41  import org.apache.hadoop.hbase.HConstants;
42  import org.apache.hadoop.hbase.TableName;
43  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
44  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
45  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
46  import org.apache.hadoop.hbase.regionserver.SequenceId;
47  import org.apache.hadoop.hbase.util.Bytes;
48  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
49  
50  import com.google.common.annotations.VisibleForTesting;
51  import com.google.protobuf.ByteString;
52  
53  
54  
55  
56  // imports for things that haven't moved from regionserver.wal yet.
57  import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
58  import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
59  
60  /**
61   * A Key for an entry in the change log.
62   *
63   * The log intermingles edits to many tables and rows, so each log entry
64   * identifies the appropriate table and row.  Within a table and row, they're
65   * also sorted.
66   *
67   * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
68   * associated row.
69   *
70   * Note that protected members marked @InterfaceAudience.Private are only protected
71   * to support the legacy HLogKey class, which is in a different package.
72   */
73  // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
74  //       purposes. They need to be merged into WALEntry.
75  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
76  public class WALKey implements SequenceId, Comparable<WALKey> {
77    public static final Log LOG = LogFactory.getLog(WALKey.class);
78  
79    // should be < 0 (@see HLogKey#readFields(DataInput))
80    // version 2 supports WAL compression
81    // public members here are only public because of HLogKey
82    @InterfaceAudience.Private
83    protected enum Version {
84      UNVERSIONED(0),
85      // Initial number we put on WALKey when we introduced versioning.
86      INITIAL(-1),
87      // Version -2 introduced a dictionary compression facility.  Only this
88      // dictionary-based compression is available in version -2.
89      COMPRESSED(-2);
90  
91      public final int code;
92      static final Version[] byCode;
93      static {
94        byCode = Version.values();
95        for (int i = 0; i < byCode.length; i++) {
96          if (byCode[i].code != -1 * i) {
97            throw new AssertionError("Values in this enum should be descending by one");
98          }
99        }
100     }
101 
102     Version(int code) {
103       this.code = code;
104     }
105 
106     public boolean atLeast(Version other) {
107       return code <= other.code;
108     }
109 
110     public static Version fromCode(int code) {
111       return byCode[code * -1];
112     }
113   }
114 
115   /*
116    * This is used for reading the log entries created by the previous releases
117    * (0.94.11) which write the clusters information to the scopes of WALEdit.
118    */
119   private static final String PREFIX_CLUSTER_KEY = ".";
120 
121 
122   // visible for deprecated HLogKey
123   @InterfaceAudience.Private
124   protected static final Version VERSION = Version.COMPRESSED;
125 
126   /** Used to represent when a particular wal key doesn't know/care about the sequence ordering. */
127   public static final long NO_SEQUENCE_ID = -1;
128 
129 
130   // visible for deprecated HLogKey
131   @InterfaceAudience.Private
132   protected byte [] encodedRegionName;
133   // visible for deprecated HLogKey
134   @InterfaceAudience.Private
135   protected TableName tablename;
136   // visible for deprecated HLogKey
137   @InterfaceAudience.Private
138   protected long logSeqNum;
139   private long origLogSeqNum = 0;
140   private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
141   // Time at which this edit was written.
142   // visible for deprecated HLogKey
143   @InterfaceAudience.Private
144   protected long writeTime;
145 
146   // The first element in the list is the cluster id on which the change has originated
147   // visible for deprecated HLogKey
148   @InterfaceAudience.Private
149   protected List<UUID> clusterIds;
150 
151   private NavigableMap<byte[], Integer> scopes;
152 
153   private long nonceGroup = HConstants.NO_NONCE;
154   private long nonce = HConstants.NO_NONCE;
155   static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
156 
157   // visible for deprecated HLogKey
158   @InterfaceAudience.Private
159   protected CompressionContext compressionContext;
160 
161   public WALKey() {
162     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
163         new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
164   }
165 
166   @VisibleForTesting
167   public WALKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
168       final long now, UUID clusterId) {
169     List<UUID> clusterIds = new ArrayList<UUID>();
170     clusterIds.add(clusterId);
171     init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
172         HConstants.NO_NONCE, HConstants.NO_NONCE);
173   }
174 
175   public WALKey(final byte[] encodedRegionName, final TableName tablename) {
176     this(encodedRegionName, tablename, System.currentTimeMillis());
177   }
178 
179   public WALKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
180     init(encodedRegionName, tablename, NO_SEQUENCE_ID, now,
181         EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
182   }
183 
184   /**
185    * Create the log key for writing to somewhere.
186    * We maintain the tablename mainly for debugging purposes.
187    * A regionName is always a sub-table object.
188    * <p>Used by log splitting and snapshots.
189    *
190    * @param encodedRegionName Encoded name of the region as returned by
191    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
192    * @param tablename   - name of table
193    * @param logSeqNum   - log sequence number
194    * @param now Time at which this edit was written.
195    * @param clusterIds the clusters that have consumed the change(used in Replication)
196    */
197   public WALKey(final byte [] encodedRegionName, final TableName tablename,
198       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
199     init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
200   }
201 
202   /**
203    * Create the log key for writing to somewhere.
204    * We maintain the tablename mainly for debugging purposes.
205    * A regionName is always a sub-table object.
206    *
207    * @param encodedRegionName Encoded name of the region as returned by
208    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
209    * @param tablename
210    * @param now Time at which this edit was written.
211    * @param clusterIds the clusters that have consumed the change(used in Replication)
212    * @param nonceGroup
213    * @param nonce
214    */
215   public WALKey(final byte [] encodedRegionName, final TableName tablename,
216       final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
217     init(encodedRegionName, tablename, NO_SEQUENCE_ID, now, clusterIds,
218       nonceGroup, nonce);
219   }
220 
221   /**
222    * Create the log key for writing to somewhere.
223    * We maintain the tablename mainly for debugging purposes.
224    * A regionName is always a sub-table object.
225    *
226    * @param encodedRegionName Encoded name of the region as returned by
227    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
228    * @param tablename
229    * @param logSeqNum
230    * @param nonceGroup
231    * @param nonce
232    */
233   public WALKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
234       long nonceGroup, long nonce) {
235     init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
236       EMPTY_UUIDS, nonceGroup, nonce);
237   }
238 
239   @InterfaceAudience.Private
240   protected void init(final byte [] encodedRegionName, final TableName tablename,
241       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
242     this.logSeqNum = logSeqNum;
243     this.writeTime = now;
244     this.clusterIds = clusterIds;
245     this.encodedRegionName = encodedRegionName;
246     this.tablename = tablename;
247     this.nonceGroup = nonceGroup;
248     this.nonce = nonce;
249   }
250 
251   /**
252    * @param compressionContext Compression context to use
253    */
254   public void setCompressionContext(CompressionContext compressionContext) {
255     this.compressionContext = compressionContext;
256   }
257 
258   /** @return encoded region name */
259   public byte [] getEncodedRegionName() {
260     return encodedRegionName;
261   }
262 
263   /** @return table name */
264   public TableName getTablename() {
265     return tablename;
266   }
267 
268   /** @return log sequence number */
269   public long getLogSeqNum() {
270     return this.logSeqNum;
271   }
272 
273   /**
274    * Allow that the log sequence id to be set post-construction and release all waiters on assigned
275    * sequence number.
276    * Only public for {@link org.apache.hadoop.hbase.regionserver.wal.FSWALEntry}
277    * @param sequence
278    */
279   @InterfaceAudience.Private
280   public void setLogSeqNum(final long sequence) {
281     this.logSeqNum = sequence;
282     this.seqNumAssignedLatch.countDown();
283   }
284 
285   /**
286    * Used to set original seq Id for WALKey during wal replay
287    * @param seqId
288    */
289   public void setOrigLogSeqNum(final long seqId) {
290     this.origLogSeqNum = seqId;
291   }
292 
293   /**
294    * Return a positive long if current WALKey is created from a replay edit
295    * @return original sequence number of the WALEdit
296    */
297   public long getOrigLogSeqNum() {
298     return this.origLogSeqNum;
299   }
300 
301   @Override
302   public long getSequenceId() throws IOException {
303     return getSequenceId(-1);
304   }
305 
306   /**
307    * Wait for sequence number to be assigned &amp; return the assigned value.
308    * @param maxWaitForSeqId maximum time to wait in milliseconds for sequenceid
309    * @return long the new assigned sequence number
310    * @throws InterruptedException
311    */
312   public long getSequenceId(final long maxWaitForSeqId) throws IOException {
313     // TODO: This implementation waiting on a latch is problematic because if a higher level
314     // determines we should stop or abort, there is not global list of all these blocked WALKeys
315     // waiting on a sequence id; they can't be cancelled... interrupted. See getNextSequenceId
316     try {
317       if (maxWaitForSeqId < 0) {
318         this.seqNumAssignedLatch.await();
319       } else if (!this.seqNumAssignedLatch.await(maxWaitForSeqId, TimeUnit.MILLISECONDS)) {
320         throw new TimeoutIOException("Failed to get sequenceid after " + maxWaitForSeqId +
321           "ms; WAL system stuck or has gone away?");
322       }
323     } catch (InterruptedException ie) {
324       LOG.warn("Thread interrupted waiting for next log sequence number");
325       InterruptedIOException iie = new InterruptedIOException();
326       iie.initCause(ie);
327       throw iie;
328     }
329     return this.logSeqNum;
330   }
331 
332   /**
333    * @return the write time
334    */
335   public long getWriteTime() {
336     return this.writeTime;
337   }
338 
339   public NavigableMap<byte[], Integer> getScopes() {
340     return scopes;
341   }
342 
343   /** @return The nonce group */
344   public long getNonceGroup() {
345     return nonceGroup;
346   }
347 
348   /** @return The nonce */
349   public long getNonce() {
350     return nonce;
351   }
352 
353   public void setScopes(NavigableMap<byte[], Integer> scopes) {
354     this.scopes = scopes;
355   }
356 
357   public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
358     if (scopes != null) {
359       Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
360           .iterator();
361       while (iterator.hasNext()) {
362         Map.Entry<byte[], Integer> scope = iterator.next();
363         String key = Bytes.toString(scope.getKey());
364         if (key.startsWith(PREFIX_CLUSTER_KEY)) {
365           addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
366               .length())));
367           iterator.remove();
368         }
369       }
370       if (scopes.size() > 0) {
371         this.scopes = scopes;
372       }
373     }
374   }
375 
376   /**
377    * Marks that the cluster with the given clusterId has consumed the change
378    */
379   public void addClusterId(UUID clusterId) {
380     if (!clusterIds.contains(clusterId)) {
381       clusterIds.add(clusterId);
382     }
383   }
384 
385   /**
386    * @return the set of cluster Ids that have consumed the change
387    */
388   public List<UUID> getClusterIds() {
389     return clusterIds;
390   }
391 
392   /**
393    * @return the cluster id on which the change has originated. It there is no such cluster, it
394    *         returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
395    */
396   public UUID getOriginatingClusterId(){
397     return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
398   }
399 
400   @Override
401   public String toString() {
402     return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
403       logSeqNum;
404   }
405 
406   /**
407    * Produces a string map for this key. Useful for programmatic use and
408    * manipulation of the data stored in an WALKey, for example, printing
409    * as JSON.
410    *
411    * @return a Map containing data from this key
412    */
413   public Map<String, Object> toStringMap() {
414     Map<String, Object> stringMap = new HashMap<String, Object>();
415     stringMap.put("table", tablename);
416     stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
417     stringMap.put("sequence", logSeqNum);
418     return stringMap;
419   }
420 
421   @Override
422   public boolean equals(Object obj) {
423     if (this == obj) {
424       return true;
425     }
426     if (obj == null || getClass() != obj.getClass()) {
427       return false;
428     }
429     return compareTo((WALKey)obj) == 0;
430   }
431 
432   @Override
433   public int hashCode() {
434     int result = Bytes.hashCode(this.encodedRegionName);
435     result ^= this.logSeqNum;
436     result ^= this.writeTime;
437     return result;
438   }
439 
440   @Override
441   public int compareTo(WALKey o) {
442     int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
443     if (result == 0) {
444       if (this.logSeqNum < o.logSeqNum) {
445         result = -1;
446       } else if (this.logSeqNum  > o.logSeqNum) {
447         result = 1;
448       }
449       if (result == 0) {
450         if (this.writeTime < o.writeTime) {
451           result = -1;
452         } else if (this.writeTime > o.writeTime) {
453           return 1;
454         }
455       }
456     }
457     // why isn't cluster id accounted for?
458     return result;
459   }
460 
461   /**
462    * Drop this instance's tablename byte array and instead
463    * hold a reference to the provided tablename. This is not
464    * meant to be a general purpose setter - it's only used
465    * to collapse references to conserve memory.
466    */
467   void internTableName(TableName tablename) {
468     // We should not use this as a setter - only to swap
469     // in a new reference to the same table name.
470     assert tablename.equals(this.tablename);
471     this.tablename = tablename;
472   }
473 
474   /**
475    * Drop this instance's region name byte array and instead
476    * hold a reference to the provided region name. This is not
477    * meant to be a general purpose setter - it's only used
478    * to collapse references to conserve memory.
479    */
480   void internEncodedRegionName(byte []encodedRegionName) {
481     // We should not use this as a setter - only to swap
482     // in a new reference to the same table name.
483     assert Bytes.equals(this.encodedRegionName, encodedRegionName);
484     this.encodedRegionName = encodedRegionName;
485   }
486 
487   public org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
488   throws IOException {
489     org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.Builder builder = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.newBuilder();
490     if (compressionContext == null) {
491       builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
492       builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
493     } else {
494       builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
495         compressionContext.regionDict));
496       builder.setTableName(compressor.compress(this.tablename.getName(),
497         compressionContext.tableDict));
498     }
499     builder.setLogSequenceNumber(this.logSeqNum);
500     builder.setWriteTime(writeTime);
501     if(this.origLogSeqNum > 0) {
502       builder.setOrigSequenceNumber(this.origLogSeqNum);
503     }
504     if (this.nonce != HConstants.NO_NONCE) {
505       builder.setNonce(nonce);
506     }
507     if (this.nonceGroup != HConstants.NO_NONCE) {
508       builder.setNonceGroup(nonceGroup);
509     }
510     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
511     for (UUID clusterId : clusterIds) {
512       uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
513       uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
514       builder.addClusterIds(uuidBuilder.build());
515     }
516     if (scopes != null) {
517       for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
518         ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
519             : compressor.compress(e.getKey(), compressionContext.familyDict);
520         builder.addScopes(FamilyScope.newBuilder()
521             .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
522       }
523     }
524     return builder;
525   }
526 
527   public void readFieldsFromPb(
528       org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
529     if (this.compressionContext != null) {
530       this.encodedRegionName = uncompressor.uncompress(
531           walKey.getEncodedRegionName(), compressionContext.regionDict);
532       byte[] tablenameBytes = uncompressor.uncompress(
533           walKey.getTableName(), compressionContext.tableDict);
534       this.tablename = TableName.valueOf(tablenameBytes);
535     } else {
536       this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
537       this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
538     }
539     clusterIds.clear();
540     if (walKey.hasClusterId()) {
541       //When we are reading the older log (0.95.1 release)
542       //This is definitely the originating cluster
543       clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
544           .getLeastSigBits()));
545     }
546     for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
547       clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
548     }
549     if (walKey.hasNonceGroup()) {
550       this.nonceGroup = walKey.getNonceGroup();
551     }
552     if (walKey.hasNonce()) {
553       this.nonce = walKey.getNonce();
554     }
555     this.scopes = null;
556     if (walKey.getScopesCount() > 0) {
557       this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
558       for (FamilyScope scope : walKey.getScopesList()) {
559         byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
560           uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
561         this.scopes.put(family, scope.getScopeType().getNumber());
562       }
563     }
564     this.logSeqNum = walKey.getLogSequenceNumber();
565     this.writeTime = walKey.getWriteTime();
566     if(walKey.hasOrigSequenceNumber()) {
567       this.origLogSeqNum = walKey.getOrigSequenceNumber();
568     }
569   }
570 }