1   /**
2    *
3    * Licensed to the Apache Software Foundation (ASF) under one
4    * or more contributor license agreements.  See the NOTICE file
5    * distributed with this work for additional information
6    * regarding copyright ownership.  The ASF licenses this file
7    * to you under the Apache License, Version 2.0 (the
8    * "License"); you may not use this file except in compliance
9    * with the License.  You may obtain a copy of the License at
10   *
11   *     http://www.apache.org/licenses/LICENSE-2.0
12   *
13   * Unless required by applicable law or agreed to in writing, software
14   * distributed under the License is distributed on an "AS IS" BASIS,
15   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16   * See the License for the specific language governing permissions and
17   * limitations under the License.
18   */
19  package org.apache.hadoop.hbase.regionserver.wal;
20  
21  import java.io.DataInput;
22  import java.io.DataOutput;
23  import java.io.EOFException;
24  import java.io.IOException;
25  import java.nio.ByteBuffer;
26  import java.util.HashMap;
27  import java.util.Map;
28  import java.util.NavigableMap;
29  import java.util.TreeMap;
30  import java.util.UUID;
31  
32  import org.apache.commons.logging.Log;
33  import org.apache.commons.logging.LogFactory;
34  import org.apache.hadoop.classification.InterfaceAudience;
35  import org.apache.hadoop.hbase.HConstants;
36  import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
37  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.FamilyScope;
38  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.ScopeType;
39  import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey;
40  import org.apache.hadoop.hbase.util.Bytes;
41  import org.apache.hadoop.io.WritableComparable;
42  import org.apache.hadoop.io.WritableUtils;
43  
44  import com.google.protobuf.ByteString;
45  
46  /**
47   * A Key for an entry in the change log.
48   *
49   * The log intermingles edits to many tables and rows, so each log entry
50   * identifies the appropriate table and row.  Within a table and row, they're
51   * also sorted.
52   *
53   * <p>Some Transactional edits (START, COMMIT, ABORT) will not have an
54   * associated row.
55   */
56  // TODO: Key and WALEdit are never used separately, or in one-to-many relation, for practical
57  //       purposes. They need to be merged into HLogEntry.
58  @InterfaceAudience.Private
59  public class HLogKey implements WritableComparable<HLogKey> {
60    public static final Log LOG = LogFactory.getLog(HLogKey.class);
61  
62    // should be < 0 (@see #readFields(DataInput))
63    // version 2 supports HLog compression
64    enum Version {
65      UNVERSIONED(0),
66      // Initial number we put on HLogKey when we introduced versioning.
67      INITIAL(-1),
68      // Version -2 introduced a dictionary compression facility.  Only this
69      // dictionary-based compression is available in version -2.
70      COMPRESSED(-2);
71  
72      final int code;
73      static final Version[] byCode;
74      static {
75        byCode = Version.values();
76        for (int i = 0; i < byCode.length; i++) {
77          if (byCode[i].code != -1 * i) {
78            throw new AssertionError("Values in this enum should be descending by one");
79          }
80        }
81      }
82  
83      Version(int code) {
84        this.code = code;
85      }
86  
87      boolean atLeast(Version other) {
88        return code <= other.code;
89      }
90  
91      static Version fromCode(int code) {
92        return byCode[code * -1];
93      }
94    }
95  
96    private static final Version VERSION = Version.COMPRESSED;
97  
98    //  The encoded region name.
99    private byte [] encodedRegionName;
100   private byte [] tablename;
101   private long logSeqNum;
102   // Time at which this edit was written.
103   private long writeTime;
104 
105   private UUID clusterId;
106 
107   private NavigableMap<byte[], Integer> scopes;
108 
109   private CompressionContext compressionContext;
110 
111   public HLogKey() {
112     this(null, null, 0L, HConstants.LATEST_TIMESTAMP,
113         HConstants.DEFAULT_CLUSTER_ID);
114   }
115 
116   /**
117    * Create the log key for writing to somewhere.
118    * We maintain the tablename mainly for debugging purposes.
119    * A regionName is always a sub-table object.
120    *
121    * @param encodedRegionName Encoded name of the region as returned by
122    * <code>HRegionInfo#getEncodedNameAsBytes()</code>.
123    * @param tablename   - name of table
124    * @param logSeqNum   - log sequence number
125    * @param now Time at which this edit was written.
126    * @param clusterId of the cluster (used in Replication)
127    */
128   public HLogKey(final byte [] encodedRegionName, final byte [] tablename,
129       long logSeqNum, final long now, UUID clusterId) {
130     this.logSeqNum = logSeqNum;
131     this.writeTime = now;
132     this.clusterId = clusterId;
133     this.encodedRegionName = encodedRegionName;
134     this.tablename = tablename;
135   }
136 
137   /**
138    * Create HLogKey wrapper around protobuf WAL key; takes care of compression.
139    * @throws IOException Never, as the compression is not enabled.
140    */
141   public HLogKey(WALKey walKey) throws IOException {
142     readFieldsFromPb(walKey, null);
143   }
144 
145   /**
146    * @param compressionContext Compression context to use
147    */
148   public void setCompressionContext(CompressionContext compressionContext) {
149     this.compressionContext = compressionContext;
150   }
151 
152   /** @return encoded region name */
153   public byte [] getEncodedRegionName() {
154     return encodedRegionName;
155   }
156 
157   /** @return table name */
158   public byte [] getTablename() {
159     return tablename;
160   }
161 
162   /** @return log sequence number */
163   public long getLogSeqNum() {
164     return this.logSeqNum;
165   }
166 
167   /**
168    * @return the write time
169    */
170   public long getWriteTime() {
171     return this.writeTime;
172   }
173 
174   /**
175    * Get the id of the original cluster
176    * @return Cluster id.
177    */
178   public UUID getClusterId() {
179     return clusterId;
180   }
181 
182   public NavigableMap<byte[], Integer> getScopes() {
183     return scopes;
184   }
185 
186   public void setScopes(NavigableMap<byte[], Integer> scopes) {
187     this.scopes = scopes;
188   }
189 
190   /**
191    * Set the cluster id of this key.
192    * @param clusterId
193    */
194   public void setClusterId(UUID clusterId) {
195     this.clusterId = clusterId;
196   }
197 
198   @Override
199   public String toString() {
200     return Bytes.toString(tablename) + "/" + Bytes.toString(encodedRegionName) + "/" +
201       logSeqNum;
202   }
203 
204   /**
205    * Produces a string map for this key. Useful for programmatic use and
206    * manipulation of the data stored in an HLogKey, for example, printing 
207    * as JSON.
208    * 
209    * @return a Map containing data from this key
210    */
211   public Map<String, Object> toStringMap() {
212     Map<String, Object> stringMap = new HashMap<String, Object>();
213     stringMap.put("table", Bytes.toStringBinary(tablename));
214     stringMap.put("region", Bytes.toStringBinary(encodedRegionName));
215     stringMap.put("sequence", logSeqNum);
216     return stringMap;
217   }
218 
219   @Override
220   public boolean equals(Object obj) {
221     if (this == obj) {
222       return true;
223     }
224     if (obj == null || getClass() != obj.getClass()) {
225       return false;
226     }
227     return compareTo((HLogKey)obj) == 0;
228   }
229 
230   @Override
231   public int hashCode() {
232     int result = Bytes.hashCode(this.encodedRegionName);
233     result ^= this.logSeqNum;
234     result ^= this.writeTime;
235     result ^= this.clusterId.hashCode();
236     return result;
237   }
238 
239   public int compareTo(HLogKey o) {
240     int result = Bytes.compareTo(this.encodedRegionName, o.encodedRegionName);
241     if (result == 0) {
242       if (this.logSeqNum < o.logSeqNum) {
243         result = -1;
244       } else if (this.logSeqNum  > o.logSeqNum ) {
245         result = 1;
246       }
247       if (result == 0) {
248         if (this.writeTime < o.writeTime) {
249           result = -1;
250         } else if (this.writeTime > o.writeTime) {
251           return 1;
252         }
253       }
254     }
255     // why isn't cluster id accounted for?
256     return result;
257   }
258 
259   /**
260    * Drop this instance's tablename byte array and instead
261    * hold a reference to the provided tablename. This is not
262    * meant to be a general purpose setter - it's only used
263    * to collapse references to conserve memory.
264    */
265   void internTableName(byte []tablename) {
266     // We should not use this as a setter - only to swap
267     // in a new reference to the same table name.
268     assert Bytes.equals(tablename, this.tablename);
269     this.tablename = tablename;
270   }
271 
272   /**
273    * Drop this instance's region name byte array and instead
274    * hold a reference to the provided region name. This is not
275    * meant to be a general purpose setter - it's only used
276    * to collapse references to conserve memory.
277    */
278   void internEncodedRegionName(byte []encodedRegionName) {
279     // We should not use this as a setter - only to swap
280     // in a new reference to the same table name.
281     assert Bytes.equals(this.encodedRegionName, encodedRegionName);
282     this.encodedRegionName = encodedRegionName;
283   }
284 
285   @Override
286   @Deprecated
287   public void write(DataOutput out) throws IOException {
288     LOG.warn("HLogKey is being serialized to writable - only expected in test code");
289     WritableUtils.writeVInt(out, VERSION.code);
290     if (compressionContext == null) {
291       Bytes.writeByteArray(out, this.encodedRegionName);
292       Bytes.writeByteArray(out, this.tablename);
293     } else {
294       Compressor.writeCompressed(this.encodedRegionName, 0,
295           this.encodedRegionName.length, out,
296           compressionContext.regionDict);
297       Compressor.writeCompressed(this.tablename, 0, this.tablename.length, out,
298           compressionContext.tableDict);
299     }
300     out.writeLong(this.logSeqNum);
301     out.writeLong(this.writeTime);
302     // avoid storing 16 bytes when replication is not enabled
303     if (this.clusterId == HConstants.DEFAULT_CLUSTER_ID) {
304       out.writeBoolean(false);
305     } else {
306       out.writeBoolean(true);
307       out.writeLong(this.clusterId.getMostSignificantBits());
308       out.writeLong(this.clusterId.getLeastSignificantBits());
309     }
310   }
311 
312   @Override
313   public void readFields(DataInput in) throws IOException {
314     Version version = Version.UNVERSIONED;
315     // HLogKey was not versioned in the beginning.
316     // In order to introduce it now, we make use of the fact
317     // that encodedRegionName was written with Bytes.writeByteArray,
318     // which encodes the array length as a vint which is >= 0.
319     // Hence if the vint is >= 0 we have an old version and the vint
320     // encodes the length of encodedRegionName.
321     // If < 0 we just read the version and the next vint is the length.
322     // @see Bytes#readByteArray(DataInput)
323     this.scopes = null; // writable HLogKey does not contain scopes
324     int len = WritableUtils.readVInt(in);
325     if (len < 0) {
326       // what we just read was the version
327       version = Version.fromCode(len);
328       // We only compress V2 of HLogkey.
329       // If compression is on, the length is handled by the dictionary
330       if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
331         len = WritableUtils.readVInt(in);
332       }
333     }
334     if (compressionContext == null || !version.atLeast(Version.COMPRESSED)) {
335       this.encodedRegionName = new byte[len];
336       in.readFully(this.encodedRegionName);
337       this.tablename = Bytes.readByteArray(in);
338     } else {
339       this.encodedRegionName = Compressor.readCompressed(in, compressionContext.regionDict);
340       this.tablename = Compressor.readCompressed(in, compressionContext.tableDict);
341     }
342 
343     this.logSeqNum = in.readLong();
344     this.writeTime = in.readLong();
345     this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
346     if (version.atLeast(Version.INITIAL)) {
347       if (in.readBoolean()) {
348         this.clusterId = new UUID(in.readLong(), in.readLong());
349       }
350     } else {
351       try {
352         // dummy read (former byte cluster id)
353         in.readByte();
354       } catch(EOFException e) {
355         // Means it's a very old key, just continue
356       }
357     }
358   }
359 
360   public WALKey.Builder getBuilder(
361       WALCellCodec.ByteStringCompressor compressor) throws IOException {
362     WALKey.Builder builder = WALKey.newBuilder();
363     if (compressionContext == null) {
364       builder.setEncodedRegionName(ByteString.copyFrom(this.encodedRegionName));
365       builder.setTableName(ByteString.copyFrom(this.tablename));
366     } else {
367       builder.setEncodedRegionName(
368           compressor.compress(this.encodedRegionName, compressionContext.regionDict));
369       builder.setTableName(compressor.compress(this.tablename, compressionContext.tableDict));
370     }
371     builder.setLogSequenceNumber(this.logSeqNum);
372     builder.setWriteTime(writeTime);
373     if (this.clusterId != HConstants.DEFAULT_CLUSTER_ID) {
374       builder.setClusterId(HBaseProtos.UUID.newBuilder()
375           .setLeastSigBits(this.clusterId.getLeastSignificantBits())
376           .setMostSigBits(this.clusterId.getMostSignificantBits()));
377     }
378     if (scopes != null) {
379       for (Map.Entry<byte[], Integer> e : scopes.entrySet()) {
380         ByteString family = (compressionContext == null) ? ByteString.copyFrom(e.getKey())
381             : compressor.compress(e.getKey(), compressionContext.familyDict);
382         builder.addScopes(FamilyScope.newBuilder()
383             .setFamily(family).setScopeType(ScopeType.valueOf(e.getValue())));
384       }
385     }
386     return builder;
387   }
388 
389   public void readFieldsFromPb(
390       WALKey walKey, WALCellCodec.ByteStringUncompressor uncompressor) throws IOException {
391     if (this.compressionContext != null) {
392       this.encodedRegionName = uncompressor.uncompress(
393           walKey.getEncodedRegionName(), compressionContext.regionDict);
394       this.tablename = uncompressor.uncompress(
395           walKey.getTableName(), compressionContext.tableDict);
396     } else {
397       this.encodedRegionName = walKey.getEncodedRegionName().toByteArray();
398       this.tablename = walKey.getTableName().toByteArray();
399     }
400     this.clusterId = HConstants.DEFAULT_CLUSTER_ID;
401     if (walKey.hasClusterId()) {
402       this.clusterId = new UUID(
403           walKey.getClusterId().getMostSigBits(), walKey.getClusterId().getLeastSigBits());
404     }
405     this.scopes = null;
406     if (walKey.getScopesCount() > 0) {
407       this.scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
408       for (FamilyScope scope : walKey.getScopesList()) {
409         byte[] family = (compressionContext == null) ? scope.getFamily().toByteArray() :
410           uncompressor.uncompress(scope.getFamily(), compressionContext.familyDict);
411         this.scopes.put(family, scope.getScopeType().getNumber());
412       }
413     }
414     this.logSeqNum = walKey.getLogSequenceNumber();
415     this.writeTime = walKey.getWriteTime();
416   }
417 }