View Javadoc

1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.EOFException;
24  import java.io.IOException;
25  import java.util.ArrayList;
26  import java.util.HashMap;
27  import java.util.Iterator;
28  import java.util.List;
29  import java.util.Map;
30  import java.util.NavigableMap;
31  import java.util.TreeMap;
32  import java.util.UUID;
33  
34  import com.google.protobuf.HBaseZeroCopyByteString;
35  import org.apache.commons.logging.Log;
36  import org.apache.commons.logging.LogFactory;
37  import org.apache.hadoop.classification.InterfaceAudience;
38  import org.apache.hadoop.hbase.HConstants;
39  import org.apache.hadoop.hbase.HRegionInfo;
40  import org.apache.hadoop.hbase.TableName;
41  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
42  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
43  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
44  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
45  import org.apache.hadoop.hbase.util.Bytes;
46  import org.apache.hadoop.io.WritableComparable;
47  import org.apache.hadoop.io.WritableUtils;
48  
49  import com.google.common.annotations.VisibleForTesting;
50  import com.google.protobuf.ByteString;
51  
52  /**
53   * A Key for an entry in the change log.
54   *
55   * The log intermingles edits to many tables and rows, so each log entry
56   * identifies the appropriate table and row.  Within a table and row, they're
57   * also sorted.
58   *
59   * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
60   * associated row.
61   */
62  // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
63  //       purposes. They need to be merged into HLogEntry.
64  @InterfaceAudience.Private
65  public class HLogKey implements WritableComparable<HLogKey> {
66    public static final Log LOG = LogFactory.getLog(HLogKey.class);
67  
68    // should be < 0 (@see #readFields(DataInput))
69    // version 2 supports HLog compression
70    enum Version {
71      UNVERSIONED(0),
72      // Initial number we put on HLogKey when we introduced versioning.
73      INITIAL(-1),
74      // Version -2 introduced a dictionary compression facility.  Only this
75      // dictionary-based compression is available in version -2.
76      COMPRESSED(-2);
77  
78      final int code;
79      static final Version[] byCode;
80      static {
81        byCode = Version.values();
82        for (int i = 0; i < byCode.length; i++) {
83          if (byCode[i].code != -1 * i) {
84            throw new AssertionError("Values in this enum should be descending by one");
85          }
86        }
87      }
88  
89      Version(int code) {
90        this.code = code;
91      }
92  
93      boolean atLeast(Version other) {
94        return code <= other.code;
95      }
96  
97      static Version fromCode(int code) {
98        return byCode[code * -1];
99      }
100   }
101 
102   /*
103    * This is used for reading the log entries created by the previous releases
104    * (0.94.11) which write the clusters information to the scopes of WALEdit.
105    */
106   private static final String PREFIX_CLUSTER_KEY = ".";
107 
108 
109   private static final Version VERSION = Version.COMPRESSED;
110 
111   //  The encoded region name.
112   private byte [] encodedRegionName;
113   private TableName tablename;
114   private long logSeqNum;
115   // Time at which this edit was written.
116   private long writeTime;
117 
118   // The first element in the list is the cluster id on which the change has originated
119   private List<UUID> clusterIds;
120 
121   private NavigableMap<byte[], Integer> scopes;
122 
123   private long nonceGroup = HConstants.NO_NONCE;
124   private long nonce = HConstants.NO_NONCE;
125 
126   private CompressionContext compressionContext;
127 
128   public HLogKey() {
129     init(null, null, 0L, HConstants.LATEST_TIMESTAMP,
130         new ArrayList<UUID>(), HConstants.NO_NONCE, HConstants.NO_NONCE);
131   }
132 
133   @VisibleForTesting
134   public HLogKey(final byte[] encodedRegionName, final TableName tablename, long logSeqNum,
135       final long now, UUID clusterId) {
136     List<UUID> clusterIds = new ArrayList<UUID>();
137     clusterIds.add(clusterId);
138     init(encodedRegionName, tablename, logSeqNum, now, clusterIds,
139         HConstants.NO_NONCE, HConstants.NO_NONCE);
140   }
141 
142   /**
143    * Create the log key for writing to somewhere.
144    * We maintain the tablename mainly for debugging purposes.
145    * A regionName is always a sub-table object.
146    *
147    * @param encodedRegionName Encoded name of the region as returned by
148    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
149    * @param tablename   - name of table
150    * @param logSeqNum   - log sequence number
151    * @param now Time at which this edit was written.
152    * @param clusterIds the clusters that have consumed the change(used in Replication)
153    */
154   public HLogKey(final byte [] encodedRegionName, final TableName tablename,
155       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
156     init(encodedRegionName, tablename, logSeqNum, now, clusterIds, nonceGroup, nonce);
157   }
158 
159   protected void init(final byte [] encodedRegionName, final TableName tablename,
160       long logSeqNum, final long now, List<UUID> clusterIds, long nonceGroup, long nonce) {
161     this.logSeqNum = logSeqNum;
162     this.writeTime = now;
163     this.clusterIds = clusterIds;
164     this.encodedRegionName = encodedRegionName;
165     this.tablename = tablename;
166     this.nonceGroup = nonceGroup;
167     this.nonce = nonce;
168   }
169 
170   /**
171    * @param compressionContext Compression context to use
172    */
173   public void setCompressionContext(CompressionContext compressionContext) {
174     this.compressionContext = compressionContext;
175   }
176 
177   /** @return encoded region name */
178   public byte [] getEncodedRegionName() {
179     return encodedRegionName;
180   }
181 
182   /** @return table name */
183   public TableName getTablename() {
184     return tablename;
185   }
186 
187   /** @return log sequence number */
188   public long getLogSeqNum() {
189     return this.logSeqNum;
190   }
191 
192   /**
193    * Allow that the log sequence id to be set post-construction.
194    * @param sequence
195    */
196   void setLogSeqNum(final long sequence) {
197     this.logSeqNum = sequence;
198   }
199 
200   /**
201    * @return the write time
202    */
203   public long getWriteTime() {
204     return this.writeTime;
205   }
206 
207   public NavigableMap<byte[], Integer> getScopes() {
208     return scopes;
209   }
210 
211   /** @return The nonce group */
212   public long getNonceGroup() {
213     return nonceGroup;
214   }
215 
216   /** @return The nonce */
217   public long getNonce() {
218     return nonce;
219   }
220 
221   public void setScopes(NavigableMap<byte[], Integer> scopes) {
222     this.scopes = scopes;
223   }
224 
225   public void readOlderScopes(NavigableMap<byte[], Integer> scopes) {
226     if (scopes != null) {
227       Iterator<Map.Entry<byte[], Integer>> iterator = scopes.entrySet()
228           .iterator();
229       while (iterator.hasNext()) {
230         Map.Entry<byte[], Integer> scope = iterator.next();
231         String key = Bytes.toString(scope.getKey());
232         if (key.startsWith(PREFIX_CLUSTER_KEY)) {
233           addClusterId(UUID.fromString(key.substring(PREFIX_CLUSTER_KEY
234               .length())));
235           iterator.remove();
236         }
237       }
238       if (scopes.size() > 0) {
239         this.scopes = scopes;
240       }
241     }
242   }
243 
244   /**
245    * Marks that the cluster with the given clusterId has consumed the change
246    */
247   public void addClusterId(UUID clusterId) {
248     if (!clusterIds.contains(clusterId)) {
249       clusterIds.add(clusterId);
250     }
251   }
252 
253   /**
254    * @return the set of cluster Ids that have consumed the change
255    */
256   public List<UUID> getClusterIds() {
257     return clusterIds;
258   }
259 
260   /**
261    * @return the cluster id on which the change has originated. It there is no such cluster, it
262    *         returns DEFAULT_CLUSTER_ID (cases where replication is not enabled)
263    */
264   public UUID getOriginatingClusterId(){
265     return clusterIds.isEmpty() ? HConstants.DEFAULT_CLUSTER_ID : clusterIds.get(0);
266   }
267 
268   @Override
269   public String toString() {
270     return tablename + "/" + Bytes.toString(encodedRegionName) + "/" +
271       logSeqNum;
272   }
273 
274   /**
275    * Produces a string map for this key. Useful for programmatic use and
276    * manipulation of the data stored in an HLogKey, for example, printing
277    * as JSON.
278    *
279    * @return a Map containing data from this key
280    */
281   public Map<String, Object> toStringMap() {
282     Map<String, Object> stringMap = new HashMap<String, Object>();
283     stringMap.put("table", tablename);
284     stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
285     stringMap.put("sequence", logSeqNum);
286     return stringMap;
287   }
288 
289   @Override
290   public boolean equals(Object obj) {
291     if (this == obj) {
292       return true;
293     }
294     if (obj == null || getClass() != obj.getClass()) {
295       return false;
296     }
297     return compareTo((HLogKey)obj) == 0;
298   }
299 
300   @Override
301   public int hashCode() {
302     int result = Bytes.hashCode(this.encodedRegionName);
303     result ^= this.logSeqNum;
304     result ^= this.writeTime;
305     return result;
306   }
307 
308   public int compareTo(HLogKey o) {
309     int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
310     if (result == 0) {
311       if (this.logSeqNum < o.logSeqNum) {
312         result = -1;
313       } else if (this.logSeqNum  > o.logSeqNum ) {
314         result = 1;
315       }
316       if (result == 0) {
317         if (this.writeTime < o.writeTime) {
318           result = -1;
319         } else if (this.writeTime > o.writeTime) {
320           return 1;
321         }
322       }
323     }
324     // why isn't cluster id accounted for?
325     return result;
326   }
327 
328   /**
329    * Drop this instance's tablename byte array and instead
330    * hold a reference to the provided tablename. This is not
331    * meant to be a general purpose setter - it's only used
332    * to collapse references to conserve memory.
333    */
334   void internTableName(TableName tablename) {
335     // We should not use this as a setter - only to swap
336     // in a new reference to the same table name.
337     assert tablename.equals(this.tablename);
338     this.tablename = tablename;
339   }
340 
341   /**
342    * Drop this instance's region name byte array and instead
343    * hold a reference to the provided region name. This is not
344    * meant to be a general purpose setter - it's only used
345    * to collapse references to conserve memory.
346    */
347   void internEncodedRegionName(byte []encodedRegionName) {
348     // We should not use this as a setter - only to swap
349     // in a new reference to the same table name.
350     assert Bytes.equals(this.encodedRegionName, encodedRegionName);
351     this.encodedRegionName = encodedRegionName;
352   }
353 
354   @Override
355   @Deprecated
356   public void write(DataOutput out) throws IOException {
357     LOG.warn("HLogKey is being serialized to writable - only expected in test code");
358     WritableUtils.writeVInt(out, VERSION.code);
359     if (compressionContext == null) {
360       Bytes.writeByteArray(out, this.encodedRegionName);
361       Bytes.writeByteArray(out, this.tablename.getName());
362     } else {
363       Compressor.writeCompressed(this.encodedRegionName, 0,
364           this.encodedRegionName.length, out,
365           compressionContext.regionDict);
366       Compressor.writeCompressed(this.tablename.getName(), 0, this.tablename.getName().length, out,
367           compressionContext.tableDict);
368     }
369     out.writeLong(this.logSeqNum);
370     out.writeLong(this.writeTime);
371     // Don't need to write the clusters information as we are using protobufs from 0.95
372     // Writing only the first clusterId for testing the legacy read
373     Iterator<UUID> iterator = clusterIds.iterator();
374     if(iterator.hasNext()){
375       out.writeBoolean(true);
376       UUID clusterId = iterator.next();
377       out.writeLong(clusterId.getMostSignificantBits());
378       out.writeLong(clusterId.getLeastSignificantBits());
379     } else {
380       out.writeBoolean(false);
381     }
382   }
383 
384   @Override
385   public void readFields(DataInput in) throws IOException {
386     Version version = Version.UNVERSIONED;
387     // HLogKey was not versioned in the beginning.
388     // In order to introduce it now, we make use of the fact
389     // that encodedRegionName was written with Bytes.writeByteArray,
390     // which encodes the array length as a vint which is >= 0.
391     // Hence if the vint is >= 0 we have an old version and the vint
392     // encodes the length of encodedRegionName.
393     // If < 0 we just read the version and the next vint is the length.
394     // @see Bytes#readByteArray(DataInput)
395     this.scopes = null; // writable HLogKey does not contain scopes
396     int len = WritableUtils.readVInt(in);
397     byte[] tablenameBytes = null;
398     if (len < 0) {
399       // what we just read was the version
400       version = Version.fromCode(len);
401       // We only compress V2 of HLogkey.
402       // If compression is on, the length is handled by the dictionary
403       if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
404         len = WritableUtils.readVInt(in);
405       }
406     }
407     if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
408       this.encodedRegionName = new byte[len];
409       in.readFully(this.encodedRegionName);
410       tablenameBytes = Bytes.readByteArray(in);
411     } else {
412       this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict);
413       tablenameBytes = Compressor.readCompressed(in, compressionContext.tableDict);
414     }
415 
416     this.logSeqNum = in.readLong();
417     this.writeTime = in.readLong();
418 
419     this.clusterIds.clear();
420     if (version.atLeast(Version.INITIAL)) {
421       if (in.readBoolean()) {
422         // read the older log
423         // Definitely is the originating cluster
424         clusterIds.add(new UUID(in.readLong(), in.readLong()));
425       }
426     } else {
427       try {
428         // dummy read (former byte cluster id)
429         in.readByte();
430       } catch(EOFException e) {
431         // Means it's a very old key, just continue
432       }
433     }
434     try {
435       this.tablename = TableName.valueOf(tablenameBytes);
436     } catch (IllegalArgumentException iae) {
437       if (Bytes.toString(tablenameBytes).equals(TableName.OLD_META_STR)) {
438         // It is a pre-namespace meta table edit, continue with new format.
439         LOG.info("Got an old .META. edit, continuing with new format ");
440         this.tablename = TableName.META_TABLE_NAME;
441         this.encodedRegionName = HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
442       } else if (Bytes.toString(tablenameBytes).equals(TableName.OLD_ROOT_STR)) {
443         this.tablename = TableName.OLD_ROOT_TABLE_NAME;
444          throw iae;
445       } else throw iae;
446     }
447     // Do not need to read the clusters information as we are using protobufs from 0.95
448   }
449 
450   public WALKey.Builder getBuilder(WALCellCodec.ByteStringCompressor compressor)
451   throws IOException {
452     WALKey.Builder builder = WALKey.newBuilder();
453     if (compressionContext == null) {
454       builder.setEncodedRegionName(HBaseZeroCopyByteString.wrap(this.encodedRegionName));
455       builder.setTableName(HBaseZeroCopyByteString.wrap(this.tablename.getName()));
456     } else {
457       builder.setEncodedRegionName(compressor.compress(this.encodedRegionName,
458         compressionContext.regionDict));
459       builder.setTableName(compressor.compress(this.tablename.getName(),
460         compressionContext.tableDict));
461     }
462     builder.setLogSequenceNumber(this.logSeqNum);
463     builder.setWriteTime(writeTime);
464     if (this.nonce != HConstants.NO_NONCE) {
465       builder.setNonce(nonce);
466     }
467     if (this.nonceGroup != HConstants.NO_NONCE) {
468       builder.setNonceGroup(nonceGroup);
469     }
470     HBaseProtos.UUID.Builder uuidBuilder = HBaseProtos.UUID.newBuilder();
471     for (UUID clusterId : clusterIds) {
472       uuidBuilder.setLeastSigBits(clusterId.getLeastSignificantBits());
473       uuidBuilder.setMostSigBits(clusterId.getMostSignificantBits());
474       builder.addClusterIds(uuidBuilder.build());
475     }
476     if (scopes != null) {
477       for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
478         ByteString family = (compressionContext == null) ?
479             HBaseZeroCopyByteString.wrap(e.getKey())
480             : compressor.compress(e.getKey(), compressionContext.familyDict);
481         builder.addScopes(FamilyScope.newBuilder()
482             .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
483       }
484     }
485     return builder;
486   }
487 
488   public void readFieldsFromPb(
489       WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
490     if (this.compressionContext != null) {
491       this.encodedRegionName = uncompressor.uncompress(
492           walKey.getEncodedRegionName(), compressionContext.regionDict);
493       byte[] tablenameBytes = uncompressor.uncompress(
494           walKey.getTableName(), compressionContext.tableDict);
495       this.tablename = TableName.valueOf(tablenameBytes);
496     } else {
497       this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
498       this.tablename = TableName.valueOf(walKey.getTableName().toByteArray());
499     }
500     clusterIds.clear();
501     if (walKey.hasClusterId()) {
502       //When we are reading the older log (0.95.1 release)
503       //This is definitely the originating cluster
504       clusterIds.add(new UUID(walKey.getClusterId().getMostSigBits(), walKey.getClusterId()
505           .getLeastSigBits()));
506     }
507     for (HBaseProtos.UUID clusterId : walKey.getClusterIdsList()) {
508       clusterIds.add(new UUID(clusterId.getMostSigBits(), clusterId.getLeastSigBits()));
509     }
510     if (walKey.hasNonceGroup()) {
511       this.nonceGroup = walKey.getNonceGroup();
512     }
513     if (walKey.hasNonce()) {
514       this.nonce = walKey.getNonce();
515     }
516     this.scopes = null;
517     if (walKey.getScopesCount() > 0) {
518       this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
519       for (FamilyScope scope : walKey.getScopesList()) {
520         byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
521           uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
522         this.scopes.put(family, scope.getScopeType().getNumber());
523       }
524     }
525     this.logSeqNum = walKey.getLogSequenceNumber();
526     this.writeTime = walKey.getWriteTime();
527   }
528 }