View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.EOFException;
24  import java.io.IOException;
25  import java.io.InterruptedIOException;
26  import java.util.ArrayList;
27  import java.util.Collections;
28  import java.util.HashMap;
29  import java.util.Iterator;
30  import java.util.List;
31  import java.util.Map;
32  import java.util.NavigableMap;
33  import java.util.TreeMap;
34  import java.util.UUID;
35  import java.util.concurrent.CountDownLatch;
36  
37  
38  import org.apache.hadoop.hbase.util.ByteStringer;
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.classification.InterfaceAudience;
42  import org.apache.hadoop.hbase.HBaseInterfaceAudience;
43  import org.apache.hadoop.hbase.HConstants;
44  import org.apache.hadoop.hbase.HRegionInfo;
45  import org.apache.hadoop.hbase.TableName;
46  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
47  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
48  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
49  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
50  import org.apache.hadoop.hbase.regionserver.SequenceId;
51  import org.apache.hadoop.hbase.util.Bytes;
52  import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
53  import org.apache.hadoop.io.WritableComparable;
54  import org.apache.hadoop.io.WritableUtils;
55  
56  import com.google.common.annotations.VisibleForTesting;
57  import com.google.protobuf.ByteString;
58  
59  /**
60   * A Key for an entry in the change log.
61   *
62   * The log intermingles edits to many tables and rows, so each log entry
63   * identifies the appropriate table and row.  Within a table and row, they're
64   * also sorted.
65   *
66   * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
67   * associated row.
68   */
69  // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
70  //       purposes. They need to be merged into HLogEntry.
71  @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
72  public class HLogKey implements WritableComparable<HLogKey>, SequenceId {
73    public static final Log LOG = LogFactory.getLog(HLogKey.class);
74  
75    // should be < 0 (@see #readFields(DataInput))
76    // version 2 supports HLog compression
77    enum Version {
78      UNVERSIONED(0),
79      // Initial number we put on HLogKey when we introduced versioning.
80      INITIAL(-1),
81      // Version -2 introduced a dictionary compression facility.  Only this
82      // dictionary-based compression is available in version -2.
83      COMPRESSED(-2);
84  
85      final int code;
86      static final Version[] byCode;
87      static {
88        byCode = Version.values();
89        for (int i = 0; i < byCode.length; i++) {
90          if (byCode[i].code != -1 * i) {
91            throw new AssertionError("Values in this enum should be descending by one");
92          }
93        }
94      }
95  
96      Version(int code) {
97        this.code = code;
98      }
99  
100     boolean atLeast(Version other) {
101       return code <= other.code;
102     }
103 
104     static Version fromCode(int code) {
105       return byCode[code * -1];
106     }
107   }
108 
109   /*
110    * This is used for reading the log entries created by the previous releases
111    * (0.94.11) which write the clusters information to the scopes of WALEdit.
112    */
113   private static final String PREFIX_CLUSTER_KEY = ".";
114 
115 
116   private static final Version VERSION = Version.COMPRESSED;
117 
118   //  The encoded region name.
119   private byte [] encodedRegionName;
120   private TableName tablename;
121   private long logSeqNum;
122   private long origLogSeqNum = 0;
123   private CountDownLatch seqNumAssignedLatch = new CountDownLatch(1);
124   // Time at which this edit was written.
125   private long writeTime;
126 
127   // The first element in the list is the cluster id on which the change has originated
128   private List<UUID> clusterIds;
129 
130   private NavigableMap<byte[], Integer> scopes;
131 
132   private long nonceGroup = HConstants.NO_NONCE;
133   private long nonce = HConstants.NO_NONCE;
134   static final List<UUID> EMPTY_UUIDS = Collections.unmodifiableList(new ArrayList<UUID>());
135 
136   private CompressionContext compressionContext;
137 
138   public HLogKey() {
139     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
140         new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
141   }
142 
143   @VisibleForTesting
144   public HLogKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
145       final long now, UUID clusterId) {
146     List<UUID> clusterIds = new ArrayList<UUID>();
147     clusterIds.add(clusterId);
148     init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
149         HConstants.NO_NONCE, HConstants.NO_NONCE);
150   }
151 
152   public HLogKey(final byte[] encodedRegionName, final TableName tablename) {
153     this(encodedRegionName, tablename, System.currentTimeMillis());
154   }
155 
156   public HLogKey(final byte[] encodedRegionName, final TableName tablename, final long now) {
157     init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now,
158         EMPTY_UUIDS, HConstants.NO_NONCE, HConstants.NO_NONCE);
159   }
160 
161   /**
162    * Create the log key for writing to somewhere.
163    * We maintain the tablename mainly for debugging purposes.
164    * A regionName is always a sub-table object.
165    * <p>Used by log splitting and snapshots.
166    *
167    * @param encodedRegionName Encoded name of the region as returned by
168    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
169    * @param tablename   - name of table
170    * @param logSeqNum   - log sequence number
171    * @param now Time at which this edit was written.
172    * @param clusterIds the clusters that have consumed the change(used in Replication)
173    */
174   public HLogKey(final byte [] encodedRegionName, final TableName tablename,
175       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
176     init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
177   }
178 
179   /**
180    * Create the log key for writing to somewhere.
181    * We maintain the tablename mainly for debugging purposes.
182    * A regionName is always a sub-table object.
183    *
184    * @param encodedRegionName Encoded name of the region as returned by
185    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
186    * @param tablename
187    * @param now Time at which this edit was written.
188    * @param clusterIds the clusters that have consumed the change(used in Replication)
189    * @param nonceGroup
190    * @param nonce
191    */
192   public HLogKey(final byte [] encodedRegionName, final TableName tablename,
193       final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
194     init(encodedRegionName, tablename, HLog.NO_SEQUENCE_ID, now, clusterIds,
195       nonceGroup, nonce);
196   }
197 
198   /**
199    * Create the log key for writing to somewhere.
200    * We maintain the tablename mainly for debugging purposes.
201    * A regionName is always a sub-table object.
202    *
203    * @param encodedRegionName Encoded name of the region as returned by
204    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
205    * @param tablename
206    * @param logSeqNum
207    * @param nonceGroup
208    * @param nonce
209    */
210   public HLogKey(final byte [] encodedRegionName, final TableName tablename, long logSeqNum,
211       long nonceGroup, long nonce) {
212     init(encodedRegionName, tablename, logSeqNum, EnvironmentEdgeManager.currentTime(),
213       EMPTY_UUIDS, nonceGroup, nonce);
214   }
215 
216   protected void init(final byte [] encodedRegionName, final TableName tablename,
217       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
218     this.logSeqNum = logSeqNum;
219     this.writeTime = now;
220     this.clusterIds = clusterIds;
221     this.encodedRegionName = encodedRegionName;
222     this.tablename = tablename;
223     this.nonceGroup = nonceGroup;
224     this.nonce = nonce;
225   }
226 
227   /**
228    * @param compressionContext Compression context to use
229    */
230   public void setCompressionContext(CompressionContext compressionContext) {
231     this.compressionContext = compressionContext;
232   }
233 
234   /** @return encoded region name */
235   public byte [] getEncodedRegionName() {
236     return encodedRegionName;
237   }
238 
239   /** @return table name */
240   public TableName getTablename() {
241     return tablename;
242   }
243 
244   /** @return log sequence number */
245   public long getLogSeqNum() {
246     return this.logSeqNum;
247   }
248 
249   /**
250    * Allow that the log sequence id to be set post-construction and release all waiters on assigned
251    * sequence number.
252    * @param sequence
253    */
254   void setLogSeqNum(final long sequence) {
255     this.logSeqNum = sequence;
256     this.seqNumAssignedLatch.countDown();
257   }
258 
259   /**
260    * Used to set original seq Id for HLogKey during wal replay
261    * @param seqId
262    */
263   public void setOrigLogSeqNum(final long seqId) {
264     this.origLogSeqNum = seqId;
265   }
266   
267   /**
268    * Return a positive long if current HLogKey is created from a replay edit
269    * @return original sequence number of the WALEdit
270    */
271   public long getOrigLogSeqNum() {
272     return this.origLogSeqNum;
273   }
274   
275   /**
276    * Wait for sequence number is assigned & return the assigned value
277    * @return long the new assigned sequence number
278    * @throws InterruptedException
279    */
280   @Override
281   public long getSequenceId() throws IOException {
282     try {
283       this.seqNumAssignedLatch.await();
284     } catch (InterruptedException ie) {
285       LOG.warn("Thread interrupted waiting for next log sequence number");
286       InterruptedIOException iie = new InterruptedIOException();
287       iie.initCause(ie);
288       throw iie;
289     }
290     return this.logSeqNum;
291   }
292 
293   /**
294    * @return the write time
295    */
296   public long getWriteTime() {
297     return this.writeTime;
298   }
299 
300   public NavigableMap<byte[], Integer> getScopes() {
301     return scopes;
302   }
303 
304   /** @return The nonce group */
305   public long getNonceGroup() {
306     return nonceGroup;
307   }
308 
309   /** @return The nonce */
310   public long getNonce() {
311     return nonce;
312   }
313 
314   public void setScopes(NavigableMap<byte[], Integer> scopes) {
315     this.scopes = scopes;
316   }
317 
318   public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
319     if (scopes != null) {
320       Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
321           .iterator();
322       while (iterator.hasNext()) {
323         Map.Entry<byte[], Integer> scope = iterator.next();
324         String key = Bytes.toString(scope.getKey());
325         if (key.startsWith(PREFIX_CLUSTER_KEY)) {
326           addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
327               .length())));
328           iterator.remove();
329         }
330       }
331       if (scopes.size() > 0) {
332         this.scopes = scopes;
333       }
334     }
335   }
336 
337   /**
338    * Marks that the cluster with the given clusterId has consumed the change
339    */
340   public void addClusterId(UUID clusterId) {
341     if (!clusterIds.contains(clusterId)) {
342       clusterIds.add(clusterId);
343     }
344   }
345 
346   /**
347    * @return the set of cluster Ids that have consumed the change
348    */
349   public List<UUID> getClusterIds() {
350     return clusterIds;
351   }
352 
353   /**
354    * @return the cluster id on which the change has originated. It there is no such cluster, it
355    *         returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
356    */
357   public UUID getOriginatingClusterId(){
358     return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
359   }
360 
361   @Override
362   public String toString() {
363     return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
364       logSeqNum;
365   }
366 
367   /**
368    * Produces a string map for this key. Useful for programmatic use and
369    * manipulation of the data stored in an HLogKey, for example, printing
370    * as JSON.
371    *
372    * @return a Map containing data from this key
373    */
374   public Map<String, Object> toStringMap() {
375     Map<String, Object> stringMap = new HashMap<String, Object>();
376     stringMap.put("table", tablename);
377     stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
378     stringMap.put("sequence", logSeqNum);
379     return stringMap;
380   }
381 
382   @Override
383   public boolean equals(Object obj) {
384     if (this == obj) {
385       return true;
386     }
387     if (obj == null || getClass() != obj.getClass()) {
388       return false;
389     }
390     return compareTo((HLogKey)obj) == 0;
391   }
392 
393   @Override
394   public int hashCode() {
395     int result = Bytes.hashCode(this.encodedRegionName);
396     result ^= this.logSeqNum;
397     result ^= this.writeTime;
398     return result;
399   }
400 
401   @Override
402   public int compareTo(HLogKey o) {
403     int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
404     if (result == 0) {
405       if (this.logSeqNum < o.logSeqNum) {
406         result = -1;
407       } else if (this.logSeqNum  > o.logSeqNum) {
408         result = 1;
409       }
410       if (result == 0) {
411         if (this.writeTime < o.writeTime) {
412           result = -1;
413         } else if (this.writeTime > o.writeTime) {
414           return 1;
415         }
416       }
417     }
418     // why isn't cluster id accounted for?
419     return result;
420   }
421 
422   /**
423    * Drop this instance's tablename byte array and instead
424    * hold a reference to the provided tablename. This is not
425    * meant to be a general purpose setter - it's only used
426    * to collapse references to conserve memory.
427    */
428   void internTableName(TableName tablename) {
429     // We should not use this as a setter - only to swap
430     // in a new reference to the same table name.
431     assert tablename.equals(this.tablename);
432     this.tablename = tablename;
433   }
434 
435   /**
436    * Drop this instance's region name byte array and instead
437    * hold a reference to the provided region name. This is not
438    * meant to be a general purpose setter - it's only used
439    * to collapse references to conserve memory.
440    */
441   void internEncodedRegionName(byte []encodedRegionName) {
442     // We should not use this as a setter - only to swap
443     // in a new reference to the same table name.
444     assert Bytes.equals(this.encodedRegionName, encodedRegionName);
445     this.encodedRegionName = encodedRegionName;
446   }
447 
448   @Override
449   @Deprecated
450   public void write(DataOutput out) throws IOException {
451     LOG.warn("HLogKey is being serialized to writable - only expected in test code");
452     WritableUtils.writeVInt(out, VERSION.code);
453     if (compressionContext == null) {
454       Bytes.writeByteArray(out, this.encodedRegionName);
455       Bytes.writeByteArray(out, this.tablename.getName());
456     } else {
457       Compressor.writeCompressed(this.encodedRegionName, 0,
458           this.encodedRegionName.length, out,
459           compressionContext.regionDict);
460       Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out,
461           compressionContext.tableDict);
462     }
463     out.writeLong(this.logSeqNum);
464     out.writeLong(this.writeTime);
465     // Don't need to write the clusters information as we are using protobufs from 0.95
466     // Writing only the first clusterId for testing the legacy read
467     Iterator<UUID> iterator = clusterIds.iterator();
468     if(iterator.hasNext()){
469       out.writeBoolean(true);
470       UUID clusterId = iterator.next();
471       out.writeLong(clusterId.getMostSignificantBits());
472       out.writeLong(clusterId.getLeastSignificantBits());
473     } else {
474       out.writeBoolean(false);
475     }
476   }
477 
478   @Override
479   public void readFields(DataInput in) throws IOException {
480     Version version = Version.UNVERSIONED;
481     // HLogKey was not versioned in the beginning.
482     // In order to introduce it now, we make use of the fact
483     // that encodedRegionName was written with Bytes.writeByteArray,
484     // which encodes the array length as a vint which is >= 0.
485     // Hence if the vint is >= 0 we have an old version and the vint
486     // encodes the length of encodedRegionName.
487     // If < 0 we just read the version and the next vint is the length.
488     // @see Bytes#readByteArray(DataInput)
489     this.scopes = null; // writable HLogKey does not contain scopes
490     int len = WritableUtils.readVInt(in);
491     byte[] tablenameBytes = null;
492     if (len < 0) {
493       // what we just read was the version
494       version = Version.fromCode(len);
495       // We only compress V2 of HLogkey.
496       // If compression is on, the length is handled by the dictionary
497       if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
498         len = WritableUtils.readVInt(in);
499       }
500     }
501     if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
502       this.encodedRegionName = new byte[len];
503       in.readFully(this.encodedRegionName);
504       tablenameBytes = Bytes.readByteArray(in);
505     } else {
506       this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict);
507       tablenameBytes = Compressor.readCompressed(in, compressionContext.tableDict);
508     }
509 
510     this.logSeqNum = in.readLong();
511     this.writeTime = in.readLong();
512 
513     this.clusterIds.clear();
514     if (version.atLeast(Version.INITIAL)) {
515       if (in.readBoolean()) {
516         // read the older log
517         // Definitely is the originating cluster
518         clusterIds.add(new UUID(in.readLong(), in.readLong()));
519       }
520     } else {
521       try {
522         // dummy read (former byte cluster id)
523         in.readByte();
524       } catch(EOFException e) {
525         // Means it's a very old key, just continue
526       }
527     }
528     try {
529       this.tablename = TableName.valueOf(tablenameBytes);
530     } catch (IllegalArgumentException iae) {
531       if (Bytes.toString(tablenameBytes).equals(TableName.OLD_META_STR)) {
532         // It is a pre-namespace meta table edit, continue with new format.
533         LOG.info("Got an old .META. edit, continuing with new format ");
534         this.tablename = TableName.META_TABLE_NAME;
535         this.encodedRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
536       } else if (Bytes.toString(tablenameBytes).equals(TableName.OLD_ROOT_STR)) {
537         this.tablename = TableName.OLD_ROOT_TABLE_NAME;
538          throw iae;
539       } else throw iae;
540     }
541     // Do not need to read the clusters information as we are using protobufs from 0.95
542   }
543 
544   public WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
545   throws IOException {
546     WALKey.Builder builder = WALKey.newBuilder();
547     if (compressionContext == null) {
548       builder.setEncodedRegionName(ByteStringer.wrap(this.encodedRegionName));
549       builder.setTableName(ByteStringer.wrap(this.tablename.getName()));
550     } else {
551       builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
552         compressionContext.regionDict));
553       builder.setTableName(compressor.compress(this.tablename.getName(),
554         compressionContext.tableDict));
555     }
556     builder.setLogSequenceNumber(this.logSeqNum);
557     builder.setWriteTime(writeTime);
558     if(this.origLogSeqNum > 0) {
559       builder.setOrigSequenceNumber(this.origLogSeqNum);
560     }
561     if (this.nonce != HConstants.NO_NONCE) {
562       builder.setNonce(nonce);
563     }
564     if (this.nonceGroup != HConstants.NO_NONCE) {
565       builder.setNonceGroup(nonceGroup);
566     }
567     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
568     for (UUID clusterId : clusterIds) {
569       uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
570       uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
571       builder.addClusterIds(uuidBuilder.build());
572     }
573     if (scopes != null) {
574       for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
575         ByteString family = (compressionContext == null) ? ByteStringer.wrap(e.getKey())
576             : compressor.compress(e.getKey(), compressionContext.familyDict);
577         builder.addScopes(FamilyScope.newBuilder()
578             .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
579       }
580     }
581     return builder;
582   }
583 
584   public void readFieldsFromPb(
585       WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
586     if (this.compressionContext != null) {
587       this.encodedRegionName = uncompressor.uncompress(
588           walKey.getEncodedRegionName(), compressionContext.regionDict);
589       byte[] tablenameBytes = uncompressor.uncompress(
590           walKey.getTableName(), compressionContext.tableDict);
591       this.tablename = TableName.valueOf(tablenameBytes);
592     } else {
593       this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
594       this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
595     }
596     clusterIds.clear();
597     if (walKey.hasClusterId()) {
598       //When we are reading the older log (0.95.1 release)
599       //This is definitely the originating cluster
600       clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
601           .getLeastSigBits()));
602     }
603     for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
604       clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
605     }
606     if (walKey.hasNonceGroup()) {
607       this.nonceGroup = walKey.getNonceGroup();
608     }
609     if (walKey.hasNonce()) {
610       this.nonce = walKey.getNonce();
611     }
612     this.scopes = null;
613     if (walKey.getScopesCount() > 0) {
614       this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
615       for (FamilyScope scope : walKey.getScopesList()) {
616         byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
617           uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
618         this.scopes.put(family, scope.getScopeType().getNumber());
619       }
620     }
621     this.logSeqNum = walKey.getLogSequenceNumber();
622     this.writeTime = walKey.getWriteTime();
623     if(walKey.hasOrigSequenceNumber()) {
624       this.origLogSeqNum = walKey.getOrigSequenceNumber();
625     }
626   }
627 }