View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  package org.apache.hadoop.hbase.util;
19  
20  import static com.google.common.base.Preconditions.checkArgument;
21  import static com.google.common.base.Preconditions.checkNotNull;
22  import static com.google.common.base.Preconditions.checkPositionIndex;
23  
24  import java.io.DataInput;
25  import java.io.DataOutput;
26  import java.io.IOException;
27  import java.lang.reflect.Field;
28  import java.math.BigDecimal;
29  import java.math.BigInteger;
30  import java.nio.ByteBuffer;
31  import java.nio.ByteOrder;
32  import java.nio.charset.Charset;
33  import java.security.AccessController;
34  import java.security.PrivilegedAction;
35  import java.security.SecureRandom;
36  import java.util.Arrays;
37  import java.util.Collection;
38  import java.util.Comparator;
39  import java.util.Iterator;
40  import java.util.List;
41  
42  import com.google.protobuf.ByteString;
43  
44  import org.apache.commons.logging.Log;
45  import org.apache.commons.logging.LogFactory;
46  import org.apache.hadoop.hbase.classification.InterfaceAudience;
47  import org.apache.hadoop.hbase.classification.InterfaceStability;
48  import org.apache.hadoop.hbase.Cell;
49  import org.apache.hadoop.hbase.KeyValue;
50  import org.apache.hadoop.io.RawComparator;
51  import org.apache.hadoop.io.WritableComparator;
52  import org.apache.hadoop.io.WritableUtils;
53  
54  import sun.misc.Unsafe;
55  
56  import com.google.common.annotations.VisibleForTesting;
57  import com.google.common.collect.Lists;
58  
59  import org.apache.hadoop.hbase.util.Bytes.LexicographicalComparerHolder.UnsafeComparer;
60  
61  /**
62   * Utility class that handles byte arrays, conversions to/from other types,
63   * comparisons, hash code generation, manufacturing keys for HashMaps or
64   * HashSets, and can be used as key in maps or trees.
65   */
66  @SuppressWarnings("restriction")
67  @InterfaceAudience.Public
68  @InterfaceStability.Stable
69  @edu.umd.cs.findbugs.annotations.SuppressWarnings(
70      value="EQ_CHECK_FOR_OPERAND_NOT_COMPATIBLE_WITH_THIS",
71      justification="It has been like this forever")
72  public class Bytes implements Comparable<Bytes> {
73    //HConstants.UTF8_ENCODING should be updated if this changed
74    /** When we encode strings, we always specify UTF8 encoding */
75    private static final String UTF8_ENCODING = "UTF-8";
76  
77    //HConstants.UTF8_CHARSET should be updated if this changed
78    /** When we encode strings, we always specify UTF8 encoding */
79    private static final Charset UTF8_CHARSET = Charset.forName(UTF8_ENCODING);
80  
81    //HConstants.EMPTY_BYTE_ARRAY should be updated if this changed
82    private static final byte [] EMPTY_BYTE_ARRAY = new byte [0];
83  
84    private static final Log LOG = LogFactory.getLog(Bytes.class);
85  
86    /**
87     * Size of boolean in bytes
88     */
89    public static final int SIZEOF_BOOLEAN = Byte.SIZE / Byte.SIZE;
90  
91    /**
92     * Size of byte in bytes
93     */
94    public static final int SIZEOF_BYTE = SIZEOF_BOOLEAN;
95  
96    /**
97     * Size of char in bytes
98     */
99    public static final int SIZEOF_CHAR = Character.SIZE / Byte.SIZE;
100 
101   /**
102    * Size of double in bytes
103    */
104   public static final int SIZEOF_DOUBLE = Double.SIZE / Byte.SIZE;
105 
106   /**
107    * Size of float in bytes
108    */
109   public static final int SIZEOF_FLOAT = Float.SIZE / Byte.SIZE;
110 
111   /**
112    * Size of int in bytes
113    */
114   public static final int SIZEOF_INT = Integer.SIZE / Byte.SIZE;
115 
116   /**
117    * Size of long in bytes
118    */
119   public static final int SIZEOF_LONG = Long.SIZE / Byte.SIZE;
120 
121   /**
122    * Size of short in bytes
123    */
124   public static final int SIZEOF_SHORT = Short.SIZE / Byte.SIZE;
125 
126   /**
127    * Mask to apply to a long to reveal the lower int only. Use like this:
128    * int i = (int)(0xFFFFFFFF00000000L ^ some_long_value);
129    */
130   public static final long MASK_FOR_LOWER_INT_IN_LONG = 0xFFFFFFFF00000000L;
131 
132   /**
133    * Estimate of size cost to pay beyond payload in jvm for instance of byte [].
134    * Estimate based on study of jhat and jprofiler numbers.
135    */
136   // JHat says BU is 56 bytes.
137   // SizeOf which uses java.lang.instrument says 24 bytes. (3 longs?)
138   public static final int ESTIMATED_HEAP_TAX = 16;
139 
140 
141   /**
142    * Returns length of the byte array, returning 0 if the array is null.
143    * Useful for calculating sizes.
144    * @param b byte array, which can be null
145    * @return 0 if b is null, otherwise returns length
146    */
147   final public static int len(byte[] b) {
148     return b == null ? 0 : b.length;
149   }
150 
151   private byte[] bytes;
152   private int offset;
153   private int length;
154 
155   /**
156    * Create a zero-size sequence.
157    */
158   public Bytes() {
159     super();
160   }
161 
162   /**
163    * Create a Bytes using the byte array as the initial value.
164    * @param bytes This array becomes the backing storage for the object.
165    */
166   public Bytes(byte[] bytes) {
167     this(bytes, 0, bytes.length);
168   }
169 
170   /**
171    * Set the new Bytes to the contents of the passed
172    * <code>ibw</code>.
173    * @param ibw the value to set this Bytes to.
174    */
175   public Bytes(final Bytes ibw) {
176     this(ibw.get(), ibw.getOffset(), ibw.getLength());
177   }
178 
179   /**
180    * Set the value to a given byte range
181    * @param bytes the new byte range to set to
182    * @param offset the offset in newData to start at
183    * @param length the number of bytes in the range
184    */
185   public Bytes(final byte[] bytes, final int offset,
186       final int length) {
187     this.bytes = bytes;
188     this.offset = offset;
189     this.length = length;
190   }
191 
192   /**
193    * Copy bytes from ByteString instance.
194    * @param byteString copy from
195    */
196   public Bytes(final ByteString byteString) {
197     this(byteString.toByteArray());
198   }
199 
200   /**
201    * Get the data from the Bytes.
202    * @return The data is only valid between offset and offset+length.
203    */
204   public byte [] get() {
205     if (this.bytes == null) {
206       throw new IllegalStateException("Uninitialiized. Null constructor " +
207           "called w/o accompaying readFields invocation");
208     }
209     return this.bytes;
210   }
211 
212   /**
213    * @param b Use passed bytes as backing array for this instance.
214    */
215   public void set(final byte [] b) {
216     set(b, 0, b.length);
217   }
218 
219   /**
220    * @param b Use passed bytes as backing array for this instance.
221    * @param offset
222    * @param length
223    */
224   public void set(final byte [] b, final int offset, final int length) {
225     this.bytes = b;
226     this.offset = offset;
227     this.length = length;
228   }
229 
230   /**
231    * @return the number of valid bytes in the buffer
232    * @deprecated use {@link #getLength()} instead
233    */
234   @Deprecated
235   public int getSize() {
236     if (this.bytes == null) {
237       throw new IllegalStateException("Uninitialiized. Null constructor " +
238           "called w/o accompaying readFields invocation");
239     }
240     return this.length;
241   }
242 
243   /**
244    * @return the number of valid bytes in the buffer
245    */
246   public int getLength() {
247     if (this.bytes == null) {
248       throw new IllegalStateException("Uninitialiized. Null constructor " +
249           "called w/o accompaying readFields invocation");
250     }
251     return this.length;
252   }
253 
254   /**
255    * @return offset
256    */
257   public int getOffset(){
258     return this.offset;
259   }
260 
261   public ByteString toByteString() {
262     return ByteString.copyFrom(this.bytes, this.offset, this.length);
263   }
264 
265   @Override
266   public int hashCode() {
267     return Bytes.hashCode(bytes, offset, length);
268   }
269 
270   /**
271    * Define the sort order of the Bytes.
272    * @param that The other bytes writable
273    * @return Positive if left is bigger than right, 0 if they are equal, and
274    *         negative if left is smaller than right.
275    */
276   public int compareTo(Bytes that) {
277     return BYTES_RAWCOMPARATOR.compare(
278         this.bytes, this.offset, this.length,
279         that.bytes, that.offset, that.length);
280   }
281 
282   /**
283    * Compares the bytes in this object to the specified byte array
284    * @param that
285    * @return Positive if left is bigger than right, 0 if they are equal, and
286    *         negative if left is smaller than right.
287    */
288   public int compareTo(final byte [] that) {
289     return BYTES_RAWCOMPARATOR.compare(
290         this.bytes, this.offset, this.length,
291         that, 0, that.length);
292   }
293 
294   /**
295    * @see Object#equals(Object)
296    */
297   @Override
298   public boolean equals(Object right_obj) {
299     if (right_obj instanceof byte []) {
300       return compareTo((byte [])right_obj) == 0;
301     }
302     if (right_obj instanceof Bytes) {
303       return compareTo((Bytes)right_obj) == 0;
304     }
305     return false;
306   }
307 
308   /**
309    * @see Object#toString()
310    */
311   @Override
312   public String toString() {
313     return Bytes.toString(bytes, offset, length);
314   }
315 
316   /**
317    * @param array List of byte [].
318    * @return Array of byte [].
319    */
320   public static byte [][] toArray(final List<byte []> array) {
321     // List#toArray doesn't work on lists of byte [].
322     byte[][] results = new byte[array.size()][];
323     for (int i = 0; i < array.size(); i++) {
324       results[i] = array.get(i);
325     }
326     return results;
327   }
328 
329   /**
330    * Returns a copy of the bytes referred to by this writable
331    */
332   public byte[] copyBytes() {
333     return Arrays.copyOfRange(bytes, offset, offset+length);
334   }
335   /**
336    * Byte array comparator class.
337    */
338   @InterfaceAudience.Public
339   @InterfaceStability.Stable
340   public static class ByteArrayComparator implements RawComparator<byte []> {
341     /**
342      * Constructor
343      */
344     public ByteArrayComparator() {
345       super();
346     }
347     @Override
348     public int compare(byte [] left, byte [] right) {
349       return compareTo(left, right);
350     }
351     @Override
352     public int compare(byte [] b1, int s1, int l1, byte [] b2, int s2, int l2) {
353       return LexicographicalComparerHolder.BEST_COMPARER.
354         compareTo(b1, s1, l1, b2, s2, l2);
355     }
356   }
357 
358   /**
359    * A {@link ByteArrayComparator} that treats the empty array as the largest value.
360    * This is useful for comparing row end keys for regions.
361    */
362   // TODO: unfortunately, HBase uses byte[0] as both start and end keys for region
363   // boundaries. Thus semantically, we should treat empty byte array as the smallest value
364   // while comparing row keys, start keys etc; but as the largest value for comparing
365   // region boundaries for endKeys.
366   @InterfaceAudience.Public
367   @InterfaceStability.Stable
368   public static class RowEndKeyComparator extends ByteArrayComparator {
369     @Override
370     public int compare(byte[] left, byte[] right) {
371       return compare(left, 0, left.length, right, 0, right.length);
372     }
373     @Override
374     public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
375       if (b1 == b2 && s1 == s2 && l1 == l2) {
376         return 0;
377       }
378       if (l1 == 0) {
379         return l2; //0 or positive
380       }
381       if (l2 == 0) {
382         return -1;
383       }
384       return super.compare(b1, s1, l1, b2, s2, l2);
385     }
386   }
387 
388   /**
389    * Pass this to TreeMaps where byte [] are keys.
390    */
391   public final static Comparator<byte []> BYTES_COMPARATOR = new ByteArrayComparator();
392 
393   /**
394    * Use comparing byte arrays, byte-by-byte
395    */
396   public final static RawComparator<byte []> BYTES_RAWCOMPARATOR = new ByteArrayComparator();
397 
398   /**
399    * Read byte-array written with a WritableableUtils.vint prefix.
400    * @param in Input to read from.
401    * @return byte array read off <code>in</code>
402    * @throws IOException e
403    */
404   public static byte [] readByteArray(final DataInput in)
405   throws IOException {
406     int len = WritableUtils.readVInt(in);
407     if (len < 0) {
408       throw new NegativeArraySizeException(Integer.toString(len));
409     }
410     byte [] result = new byte[len];
411     in.readFully(result, 0, len);
412     return result;
413   }
414 
415   /**
416    * Read byte-array written with a WritableableUtils.vint prefix.
417    * IOException is converted to a RuntimeException.
418    * @param in Input to read from.
419    * @return byte array read off <code>in</code>
420    */
421   public static byte [] readByteArrayThrowsRuntime(final DataInput in) {
422     try {
423       return readByteArray(in);
424     } catch (Exception e) {
425       throw new RuntimeException(e);
426     }
427   }
428 
429   /**
430    * Write byte-array with a WritableableUtils.vint prefix.
431    * @param out output stream to be written to
432    * @param b array to write
433    * @throws IOException e
434    */
435   public static void writeByteArray(final DataOutput out, final byte [] b)
436   throws IOException {
437     if(b == null) {
438       WritableUtils.writeVInt(out, 0);
439     } else {
440       writeByteArray(out, b, 0, b.length);
441     }
442   }
443 
444   /**
445    * Write byte-array to out with a vint length prefix.
446    * @param out output stream
447    * @param b array
448    * @param offset offset into array
449    * @param length length past offset
450    * @throws IOException e
451    */
452   public static void writeByteArray(final DataOutput out, final byte [] b,
453       final int offset, final int length)
454   throws IOException {
455     WritableUtils.writeVInt(out, length);
456     out.write(b, offset, length);
457   }
458 
459   /**
460    * Write byte-array from src to tgt with a vint length prefix.
461    * @param tgt target array
462    * @param tgtOffset offset into target array
463    * @param src source array
464    * @param srcOffset source offset
465    * @param srcLength source length
466    * @return New offset in src array.
467    */
468   public static int writeByteArray(final byte [] tgt, final int tgtOffset,
469       final byte [] src, final int srcOffset, final int srcLength) {
470     byte [] vint = vintToBytes(srcLength);
471     System.arraycopy(vint, 0, tgt, tgtOffset, vint.length);
472     int offset = tgtOffset + vint.length;
473     System.arraycopy(src, srcOffset, tgt, offset, srcLength);
474     return offset + srcLength;
475   }
476 
477   /**
478    * Put bytes at the specified byte array position.
479    * @param tgtBytes the byte array
480    * @param tgtOffset position in the array
481    * @param srcBytes array to write out
482    * @param srcOffset source offset
483    * @param srcLength source length
484    * @return incremented offset
485    */
486   public static int putBytes(byte[] tgtBytes, int tgtOffset, byte[] srcBytes,
487       int srcOffset, int srcLength) {
488     System.arraycopy(srcBytes, srcOffset, tgtBytes, tgtOffset, srcLength);
489     return tgtOffset + srcLength;
490   }
491 
492   /**
493    * Write a single byte out to the specified byte array position.
494    * @param bytes the byte array
495    * @param offset position in the array
496    * @param b byte to write out
497    * @return incremented offset
498    */
499   public static int putByte(byte[] bytes, int offset, byte b) {
500     bytes[offset] = b;
501     return offset + 1;
502   }
503 
504   /**
505    * Add the whole content of the ByteBuffer to the bytes arrays. The ByteBuffer is modified.
506    * @param bytes the byte array
507    * @param offset position in the array
508    * @param buf ByteBuffer to write out
509    * @return incremented offset
510    */
511   public static int putByteBuffer(byte[] bytes, int offset, ByteBuffer buf) {
512     int len = buf.remaining();
513     buf.get(bytes, offset, len);
514     return offset + len;
515   }
516 
517   /**
518    * Returns a new byte array, copied from the given {@code buf},
519    * from the index 0 (inclusive) to the limit (exclusive),
520    * regardless of the current position.
521    * The position and the other index parameters are not changed.
522    *
523    * @param buf a byte buffer
524    * @return the byte array
525    * @see #getBytes(ByteBuffer)
526    */
527   public static byte[] toBytes(ByteBuffer buf) {
528     ByteBuffer dup = buf.duplicate();
529     dup.position(0);
530     return readBytes(dup);
531   }
532 
533   private static byte[] readBytes(ByteBuffer buf) {
534     byte [] result = new byte[buf.remaining()];
535     buf.get(result);
536     return result;
537   }
538 
539   /**
540    * @param b Presumed UTF-8 encoded byte array.
541    * @return String made from <code>b</code>
542    */
543   public static String toString(final byte [] b) {
544     if (b == null) {
545       return null;
546     }
547     return toString(b, 0, b.length);
548   }
549 
550   /**
551    * Joins two byte arrays together using a separator.
552    * @param b1 The first byte array.
553    * @param sep The separator to use.
554    * @param b2 The second byte array.
555    */
556   public static String toString(final byte [] b1,
557                                 String sep,
558                                 final byte [] b2) {
559     return toString(b1, 0, b1.length) + sep + toString(b2, 0, b2.length);
560   }
561 
562   /**
563    * This method will convert utf8 encoded bytes into a string. If
564    * the given byte array is null, this method will return null.
565    *
566    * @param b Presumed UTF-8 encoded byte array.
567    * @param off offset into array
568    * @return String made from <code>b</code> or null
569    */
570   public static String toString(final byte [] b, int off) {
571     if (b == null) {
572       return null;
573     }
574     int len = b.length - off;
575     if (len <= 0) {
576       return "";
577     }
578     return new String(b, off, len, UTF8_CHARSET);
579   }
580 
581   /**
582    * This method will convert utf8 encoded bytes into a string. If
583    * the given byte array is null, this method will return null.
584    *
585    * @param b Presumed UTF-8 encoded byte array.
586    * @param off offset into array
587    * @param len length of utf-8 sequence
588    * @return String made from <code>b</code> or null
589    */
590   public static String toString(final byte [] b, int off, int len) {
591     if (b == null) {
592       return null;
593     }
594     if (len == 0) {
595       return "";
596     }
597     return new String(b, off, len, UTF8_CHARSET);
598   }
599 
600   /**
601    * Write a printable representation of a byte array.
602    *
603    * @param b byte array
604    * @return string
605    * @see #toStringBinary(byte[], int, int)
606    */
607   public static String toStringBinary(final byte [] b) {
608     if (b == null)
609       return "null";
610     return toStringBinary(b, 0, b.length);
611   }
612 
613   /**
614    * Converts the given byte buffer to a printable representation,
615    * from the index 0 (inclusive) to the limit (exclusive),
616    * regardless of the current position.
617    * The position and the other index parameters are not changed.
618    *
619    * @param buf a byte buffer
620    * @return a string representation of the buffer's binary contents
621    * @see #toBytes(ByteBuffer)
622    * @see #getBytes(ByteBuffer)
623    */
624   public static String toStringBinary(ByteBuffer buf) {
625     if (buf == null)
626       return "null";
627     if (buf.hasArray()) {
628       return toStringBinary(buf.array(), buf.arrayOffset(), buf.limit());
629     }
630     return toStringBinary(toBytes(buf));
631   }
632 
633   /**
634    * Write a printable representation of a byte array. Non-printable
635    * characters are hex escaped in the format \\x%02X, eg:
636    * \x00 \x05 etc
637    *
638    * @param b array to write out
639    * @param off offset to start at
640    * @param len length to write
641    * @return string output
642    */
643   public static String toStringBinary(final byte [] b, int off, int len) {
644     StringBuilder result = new StringBuilder();
645     // Just in case we are passed a 'len' that is > buffer length...
646     if (off >= b.length) return result.toString();
647     if (off + len > b.length) len = b.length - off;
648     for (int i = off; i < off + len ; ++i) {
649       int ch = b[i] & 0xFF;
650       if ( (ch >= '0' && ch <= '9')
651           || (ch >= 'A' && ch <= 'Z')
652           || (ch >= 'a' && ch <= 'z')
653           || " `~!@#$%^&*()-_=+[]{}|;:'\",.<>/?".indexOf(ch) >= 0) {
654         result.append((char)ch);
655       } else {
656         result.append(String.format("\\x%02X", ch));
657       }
658     }
659     return result.toString();
660   }
661 
662   private static boolean isHexDigit(char c) {
663     return
664         (c >= 'A' && c <= 'F') ||
665         (c >= '0' && c <= '9');
666   }
667 
668   /**
669    * Takes a ASCII digit in the range A-F0-9 and returns
670    * the corresponding integer/ordinal value.
671    * @param ch  The hex digit.
672    * @return The converted hex value as a byte.
673    */
674   public static byte toBinaryFromHex(byte ch) {
675     if (ch >= 'A' && ch <= 'F')
676       return (byte) ((byte)10 + (byte) (ch - 'A'));
677     // else
678     return (byte) (ch - '0');
679   }
680 
681   public static byte [] toBytesBinary(String in) {
682     // this may be bigger than we need, but let's be safe.
683     byte [] b = new byte[in.length()];
684     int size = 0;
685     for (int i = 0; i < in.length(); ++i) {
686       char ch = in.charAt(i);
687       if (ch == '\\' && in.length() > i+1 && in.charAt(i+1) == 'x') {
688         // ok, take next 2 hex digits.
689         char hd1 = in.charAt(i+2);
690         char hd2 = in.charAt(i+3);
691 
692         // they need to be A-F0-9:
693         if (!isHexDigit(hd1) ||
694             !isHexDigit(hd2)) {
695           // bogus escape code, ignore:
696           continue;
697         }
698         // turn hex ASCII digit -> number
699         byte d = (byte) ((toBinaryFromHex((byte)hd1) << 4) + toBinaryFromHex((byte)hd2));
700 
701         b[size++] = d;
702         i += 3; // skip 3
703       } else {
704         b[size++] = (byte) ch;
705       }
706     }
707     // resize:
708     byte [] b2 = new byte[size];
709     System.arraycopy(b, 0, b2, 0, size);
710     return b2;
711   }
712 
713   /**
714    * Converts a string to a UTF-8 byte array.
715    * @param s string
716    * @return the byte array
717    */
718   public static byte[] toBytes(String s) {
719     return s.getBytes(UTF8_CHARSET);
720   }
721 
722   /**
723    * Convert a boolean to a byte array. True becomes -1
724    * and false becomes 0.
725    *
726    * @param b value
727    * @return <code>b</code> encoded in a byte array.
728    */
729   public static byte [] toBytes(final boolean b) {
730     return new byte[] { b ? (byte) -1 : (byte) 0 };
731   }
732 
733   /**
734    * Reverses {@link #toBytes(boolean)}
735    * @param b array
736    * @return True or false.
737    */
738   public static boolean toBoolean(final byte [] b) {
739     if (b.length != 1) {
740       throw new IllegalArgumentException("Array has wrong size: " + b.length);
741     }
742     return b[0] != (byte) 0;
743   }
744 
745   /**
746    * Convert a long value to a byte array using big-endian.
747    *
748    * @param val value to convert
749    * @return the byte array
750    */
751   public static byte[] toBytes(long val) {
752     byte [] b = new byte[8];
753     for (int i = 7; i > 0; i--) {
754       b[i] = (byte) val;
755       val >>>= 8;
756     }
757     b[0] = (byte) val;
758     return b;
759   }
760 
761   /**
762    * Converts a byte array to a long value. Reverses
763    * {@link #toBytes(long)}
764    * @param bytes array
765    * @return the long value
766    */
767   public static long toLong(byte[] bytes) {
768     return toLong(bytes, 0, SIZEOF_LONG);
769   }
770 
771   /**
772    * Converts a byte array to a long value. Assumes there will be
773    * {@link #SIZEOF_LONG} bytes available.
774    *
775    * @param bytes bytes
776    * @param offset offset
777    * @return the long value
778    */
779   public static long toLong(byte[] bytes, int offset) {
780     return toLong(bytes, offset, SIZEOF_LONG);
781   }
782 
783   /**
784    * Converts a byte array to a long value.
785    *
786    * @param bytes array of bytes
787    * @param offset offset into array
788    * @param length length of data (must be {@link #SIZEOF_LONG})
789    * @return the long value
790    * @throws IllegalArgumentException if length is not {@link #SIZEOF_LONG} or
791    * if there's not enough room in the array at the offset indicated.
792    */
793   public static long toLong(byte[] bytes, int offset, final int length) {
794     if (length != SIZEOF_LONG || offset + length > bytes.length) {
795       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_LONG);
796     }
797     if (UnsafeComparer.isAvailable()) {
798       return toLongUnsafe(bytes, offset);
799     } else {
800       long l = 0;
801       for(int i = offset; i < offset + length; i++) {
802         l <<= 8;
803         l ^= bytes[i] & 0xFF;
804       }
805       return l;
806     }
807   }
808 
809   private static IllegalArgumentException
810     explainWrongLengthOrOffset(final byte[] bytes,
811                                final int offset,
812                                final int length,
813                                final int expectedLength) {
814     String reason;
815     if (length != expectedLength) {
816       reason = "Wrong length: " + length + ", expected " + expectedLength;
817     } else {
818      reason = "offset (" + offset + ") + length (" + length + ") exceed the"
819         + " capacity of the array: " + bytes.length;
820     }
821     return new IllegalArgumentException(reason);
822   }
823 
824   /**
825    * Put a long value out to the specified byte array position.
826    * @param bytes the byte array
827    * @param offset position in the array
828    * @param val long to write out
829    * @return incremented offset
830    * @throws IllegalArgumentException if the byte array given doesn't have
831    * enough room at the offset specified.
832    */
833   public static int putLong(byte[] bytes, int offset, long val) {
834     if (bytes.length - offset < SIZEOF_LONG) {
835       throw new IllegalArgumentException("Not enough room to put a long at"
836           + " offset " + offset + " in a " + bytes.length + " byte array");
837     }
838     if (UnsafeComparer.isAvailable()) {
839       return putLongUnsafe(bytes, offset, val);
840     } else {
841       for(int i = offset + 7; i > offset; i--) {
842         bytes[i] = (byte) val;
843         val >>>= 8;
844       }
845       bytes[offset] = (byte) val;
846       return offset + SIZEOF_LONG;
847     }
848   }
849 
850   /**
851    * Put a long value out to the specified byte array position (Unsafe).
852    * @param bytes the byte array
853    * @param offset position in the array
854    * @param val long to write out
855    * @return incremented offset
856    */
857   public static int putLongUnsafe(byte[] bytes, int offset, long val)
858   {
859     if (UnsafeComparer.littleEndian) {
860       val = Long.reverseBytes(val);
861     }
862     UnsafeComparer.theUnsafe.putLong(bytes, (long) offset +
863       UnsafeComparer.BYTE_ARRAY_BASE_OFFSET , val);
864     return offset + SIZEOF_LONG;
865   }
866 
867   /**
868    * Presumes float encoded as IEEE 754 floating-point "single format"
869    * @param bytes byte array
870    * @return Float made from passed byte array.
871    */
872   public static float toFloat(byte [] bytes) {
873     return toFloat(bytes, 0);
874   }
875 
876   /**
877    * Presumes float encoded as IEEE 754 floating-point "single format"
878    * @param bytes array to convert
879    * @param offset offset into array
880    * @return Float made from passed byte array.
881    */
882   public static float toFloat(byte [] bytes, int offset) {
883     return Float.intBitsToFloat(toInt(bytes, offset, SIZEOF_INT));
884   }
885 
886   /**
887    * @param bytes byte array
888    * @param offset offset to write to
889    * @param f float value
890    * @return New offset in <code>bytes</code>
891    */
892   public static int putFloat(byte [] bytes, int offset, float f) {
893     return putInt(bytes, offset, Float.floatToRawIntBits(f));
894   }
895 
896   /**
897    * @param f float value
898    * @return the float represented as byte []
899    */
900   public static byte [] toBytes(final float f) {
901     // Encode it as int
902     return Bytes.toBytes(Float.floatToRawIntBits(f));
903   }
904 
905   /**
906    * @param bytes byte array
907    * @return Return double made from passed bytes.
908    */
909   public static double toDouble(final byte [] bytes) {
910     return toDouble(bytes, 0);
911   }
912 
913   /**
914    * @param bytes byte array
915    * @param offset offset where double is
916    * @return Return double made from passed bytes.
917    */
918   public static double toDouble(final byte [] bytes, final int offset) {
919     return Double.longBitsToDouble(toLong(bytes, offset, SIZEOF_LONG));
920   }
921 
922   /**
923    * @param bytes byte array
924    * @param offset offset to write to
925    * @param d value
926    * @return New offset into array <code>bytes</code>
927    */
928   public static int putDouble(byte [] bytes, int offset, double d) {
929     return putLong(bytes, offset, Double.doubleToLongBits(d));
930   }
931 
932   /**
933    * Serialize a double as the IEEE 754 double format output. The resultant
934    * array will be 8 bytes long.
935    *
936    * @param d value
937    * @return the double represented as byte []
938    */
939   public static byte [] toBytes(final double d) {
940     // Encode it as a long
941     return Bytes.toBytes(Double.doubleToRawLongBits(d));
942   }
943 
944   /**
945    * Convert an int value to a byte array.  Big-endian.  Same as what DataOutputStream.writeInt
946    * does.
947    *
948    * @param val value
949    * @return the byte array
950    */
951   public static byte[] toBytes(int val) {
952     byte [] b = new byte[4];
953     for(int i = 3; i > 0; i--) {
954       b[i] = (byte) val;
955       val >>>= 8;
956     }
957     b[0] = (byte) val;
958     return b;
959   }
960 
961   /**
962    * Converts a byte array to an int value
963    * @param bytes byte array
964    * @return the int value
965    */
966   public static int toInt(byte[] bytes) {
967     return toInt(bytes, 0, SIZEOF_INT);
968   }
969 
970   /**
971    * Converts a byte array to an int value
972    * @param bytes byte array
973    * @param offset offset into array
974    * @return the int value
975    */
976   public static int toInt(byte[] bytes, int offset) {
977     return toInt(bytes, offset, SIZEOF_INT);
978   }
979 
980   /**
981    * Converts a byte array to an int value
982    * @param bytes byte array
983    * @param offset offset into array
984    * @param length length of int (has to be {@link #SIZEOF_INT})
985    * @return the int value
986    * @throws IllegalArgumentException if length is not {@link #SIZEOF_INT} or
987    * if there's not enough room in the array at the offset indicated.
988    */
989   public static int toInt(byte[] bytes, int offset, final int length) {
990     if (length != SIZEOF_INT || offset + length > bytes.length) {
991       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_INT);
992     }
993     if (UnsafeComparer.isAvailable()) {
994       return toIntUnsafe(bytes, offset);
995     } else {
996       int n = 0;
997       for(int i = offset; i < (offset + length); i++) {
998         n <<= 8;
999         n ^= bytes[i] & 0xFF;
1000       }
1001       return n;
1002     }
1003   }
1004 
1005   /**
1006    * Converts a byte array to an int value (Unsafe version)
1007    * @param bytes byte array
1008    * @param offset offset into array
1009    * @return the int value
1010    */
1011   public static int toIntUnsafe(byte[] bytes, int offset) {
1012     if (UnsafeComparer.littleEndian) {
1013       return Integer.reverseBytes(UnsafeComparer.theUnsafe.getInt(bytes,
1014         (long) offset + UnsafeComparer.BYTE_ARRAY_BASE_OFFSET));
1015     } else {
1016       return UnsafeComparer.theUnsafe.getInt(bytes,
1017         (long) offset + UnsafeComparer.BYTE_ARRAY_BASE_OFFSET);
1018     }
1019   }
1020 
1021   /**
1022    * Converts a byte array to an short value (Unsafe version)
1023    * @param bytes byte array
1024    * @param offset offset into array
1025    * @return the short value
1026    */
1027   public static short toShortUnsafe(byte[] bytes, int offset) {
1028     if (UnsafeComparer.littleEndian) {
1029       return Short.reverseBytes(UnsafeComparer.theUnsafe.getShort(bytes,
1030         (long) offset + UnsafeComparer.BYTE_ARRAY_BASE_OFFSET));
1031     } else {
1032       return UnsafeComparer.theUnsafe.getShort(bytes,
1033         (long) offset + UnsafeComparer.BYTE_ARRAY_BASE_OFFSET);
1034     }
1035   }
1036 
1037   /**
1038    * Converts a byte array to an long value (Unsafe version)
1039    * @param bytes byte array
1040    * @param offset offset into array
1041    * @return the long value
1042    */
1043   public static long toLongUnsafe(byte[] bytes, int offset) {
1044     if (UnsafeComparer.littleEndian) {
1045       return Long.reverseBytes(UnsafeComparer.theUnsafe.getLong(bytes,
1046         (long) offset + UnsafeComparer.BYTE_ARRAY_BASE_OFFSET));
1047     } else {
1048       return UnsafeComparer.theUnsafe.getLong(bytes,
1049         (long) offset + UnsafeComparer.BYTE_ARRAY_BASE_OFFSET);
1050     }
1051   }
1052 
1053   /**
1054    * Converts a byte array to an int value
1055    * @param bytes byte array
1056    * @param offset offset into array
1057    * @param length how many bytes should be considered for creating int
1058    * @return the int value
1059    * @throws IllegalArgumentException if there's not enough room in the array at the offset
1060    * indicated.
1061    */
1062   public static int readAsInt(byte[] bytes, int offset, final int length) {
1063     if (offset + length > bytes.length) {
1064       throw new IllegalArgumentException("offset (" + offset + ") + length (" + length
1065           + ") exceed the" + " capacity of the array: " + bytes.length);
1066     }
1067     int n = 0;
1068     for(int i = offset; i < (offset + length); i++) {
1069       n <<= 8;
1070       n ^= bytes[i] & 0xFF;
1071     }
1072     return n;
1073   }
1074 
1075   /**
1076    * Put an int value out to the specified byte array position.
1077    * @param bytes the byte array
1078    * @param offset position in the array
1079    * @param val int to write out
1080    * @return incremented offset
1081    * @throws IllegalArgumentException if the byte array given doesn't have
1082    * enough room at the offset specified.
1083    */
1084   public static int putInt(byte[] bytes, int offset, int val) {
1085     if (bytes.length - offset < SIZEOF_INT) {
1086       throw new IllegalArgumentException("Not enough room to put an int at"
1087           + " offset " + offset + " in a " + bytes.length + " byte array");
1088     }
1089     if (UnsafeComparer.isAvailable()) {
1090       return putIntUnsafe(bytes, offset, val);
1091     } else {
1092       for(int i= offset + 3; i > offset; i--) {
1093         bytes[i] = (byte) val;
1094         val >>>= 8;
1095       }
1096       bytes[offset] = (byte) val;
1097       return offset + SIZEOF_INT;
1098     }
1099   }
1100 
1101   /**
1102    * Put an int value out to the specified byte array position (Unsafe).
1103    * @param bytes the byte array
1104    * @param offset position in the array
1105    * @param val int to write out
1106    * @return incremented offset
1107    */
1108   public static int putIntUnsafe(byte[] bytes, int offset, int val)
1109   {
1110     if (UnsafeComparer.littleEndian) {
1111       val = Integer.reverseBytes(val);
1112     }
1113     UnsafeComparer.theUnsafe.putInt(bytes, (long) offset +
1114       UnsafeComparer.BYTE_ARRAY_BASE_OFFSET , val);
1115     return offset + SIZEOF_INT;
1116   }
1117 
1118   /**
1119    * Convert a short value to a byte array of {@link #SIZEOF_SHORT} bytes long.
1120    * @param val value
1121    * @return the byte array
1122    */
1123   public static byte[] toBytes(short val) {
1124     byte[] b = new byte[SIZEOF_SHORT];
1125     b[1] = (byte) val;
1126     val >>= 8;
1127     b[0] = (byte) val;
1128     return b;
1129   }
1130 
1131   /**
1132    * Converts a byte array to a short value
1133    * @param bytes byte array
1134    * @return the short value
1135    */
1136   public static short toShort(byte[] bytes) {
1137     return toShort(bytes, 0, SIZEOF_SHORT);
1138   }
1139 
1140   /**
1141    * Converts a byte array to a short value
1142    * @param bytes byte array
1143    * @param offset offset into array
1144    * @return the short value
1145    */
1146   public static short toShort(byte[] bytes, int offset) {
1147     return toShort(bytes, offset, SIZEOF_SHORT);
1148   }
1149 
1150   /**
1151    * Converts a byte array to a short value
1152    * @param bytes byte array
1153    * @param offset offset into array
1154    * @param length length, has to be {@link #SIZEOF_SHORT}
1155    * @return the short value
1156    * @throws IllegalArgumentException if length is not {@link #SIZEOF_SHORT}
1157    * or if there's not enough room in the array at the offset indicated.
1158    */
1159   public static short toShort(byte[] bytes, int offset, final int length) {
1160     if (length != SIZEOF_SHORT || offset + length > bytes.length) {
1161       throw explainWrongLengthOrOffset(bytes, offset, length, SIZEOF_SHORT);
1162     }
1163     if (UnsafeComparer.isAvailable()) {
1164       return toShortUnsafe(bytes, offset);
1165     } else {
1166       short n = 0;
1167       n ^= bytes[offset] & 0xFF;
1168       n <<= 8;
1169       n ^= bytes[offset+1] & 0xFF;
1170       return n;
1171    }
1172   }
1173 
1174   /**
1175    * Returns a new byte array, copied from the given {@code buf},
1176    * from the position (inclusive) to the limit (exclusive).
1177    * The position and the other index parameters are not changed.
1178    *
1179    * @param buf a byte buffer
1180    * @return the byte array
1181    * @see #toBytes(ByteBuffer)
1182    */
1183   public static byte[] getBytes(ByteBuffer buf) {
1184     return readBytes(buf.duplicate());
1185   }
1186 
1187   /**
1188    * Put a short value out to the specified byte array position.
1189    * @param bytes the byte array
1190    * @param offset position in the array
1191    * @param val short to write out
1192    * @return incremented offset
1193    * @throws IllegalArgumentException if the byte array given doesn't have
1194    * enough room at the offset specified.
1195    */
1196   public static int putShort(byte[] bytes, int offset, short val) {
1197     if (bytes.length - offset < SIZEOF_SHORT) {
1198       throw new IllegalArgumentException("Not enough room to put a short at"
1199           + " offset " + offset + " in a " + bytes.length + " byte array");
1200     }
1201     if (UnsafeComparer.isAvailable()) {
1202       return putShortUnsafe(bytes, offset, val);
1203     } else {
1204       bytes[offset+1] = (byte) val;
1205       val >>= 8;
1206       bytes[offset] = (byte) val;
1207       return offset + SIZEOF_SHORT;
1208     }
1209   }
1210 
1211   /**
1212    * Put a short value out to the specified byte array position (Unsafe).
1213    * @param bytes the byte array
1214    * @param offset position in the array
1215    * @param val short to write out
1216    * @return incremented offset
1217    */
1218   public static int putShortUnsafe(byte[] bytes, int offset, short val)
1219   {
1220     if (UnsafeComparer.littleEndian) {
1221       val = Short.reverseBytes(val);
1222     }
1223     UnsafeComparer.theUnsafe.putShort(bytes, (long) offset +
1224       UnsafeComparer.BYTE_ARRAY_BASE_OFFSET , val);
1225     return offset + SIZEOF_SHORT;
1226   }
1227 
1228   /**
1229    * Put an int value as short out to the specified byte array position. Only the lower 2 bytes of
1230    * the short will be put into the array. The caller of the API need to make sure they will not
1231    * loose the value by doing so. This is useful to store an unsigned short which is represented as
1232    * int in other parts.
1233    * @param bytes the byte array
1234    * @param offset position in the array
1235    * @param val value to write out
1236    * @return incremented offset
1237    * @throws IllegalArgumentException if the byte array given doesn't have
1238    * enough room at the offset specified.
1239    */
1240   public static int putAsShort(byte[] bytes, int offset, int val) {
1241     if (bytes.length - offset < SIZEOF_SHORT) {
1242       throw new IllegalArgumentException("Not enough room to put a short at"
1243           + " offset " + offset + " in a " + bytes.length + " byte array");
1244     }
1245     bytes[offset+1] = (byte) val;
1246     val >>= 8;
1247     bytes[offset] = (byte) val;
1248     return offset + SIZEOF_SHORT;
1249   }
1250 
1251   /**
1252    * Convert a BigDecimal value to a byte array
1253    *
1254    * @param val
1255    * @return the byte array
1256    */
1257   public static byte[] toBytes(BigDecimal val) {
1258     byte[] valueBytes = val.unscaledValue().toByteArray();
1259     byte[] result = new byte[valueBytes.length + SIZEOF_INT];
1260     int offset = putInt(result, 0, val.scale());
1261     putBytes(result, offset, valueBytes, 0, valueBytes.length);
1262     return result;
1263   }
1264 
1265 
1266   /**
1267    * Converts a byte array to a BigDecimal
1268    *
1269    * @param bytes
1270    * @return the char value
1271    */
1272   public static BigDecimal toBigDecimal(byte[] bytes) {
1273     return toBigDecimal(bytes, 0, bytes.length);
1274   }
1275 
1276   /**
1277    * Converts a byte array to a BigDecimal value
1278    *
1279    * @param bytes
1280    * @param offset
1281    * @param length
1282    * @return the char value
1283    */
1284   public static BigDecimal toBigDecimal(byte[] bytes, int offset, final int length) {
1285     if (bytes == null || length < SIZEOF_INT + 1 ||
1286       (offset + length > bytes.length)) {
1287       return null;
1288     }
1289 
1290     int scale = toInt(bytes, offset);
1291     byte[] tcBytes = new byte[length - SIZEOF_INT];
1292     System.arraycopy(bytes, offset + SIZEOF_INT, tcBytes, 0, length - SIZEOF_INT);
1293     return new BigDecimal(new BigInteger(tcBytes), scale);
1294   }
1295 
1296   /**
1297    * Put a BigDecimal value out to the specified byte array position.
1298    *
1299    * @param bytes  the byte array
1300    * @param offset position in the array
1301    * @param val    BigDecimal to write out
1302    * @return incremented offset
1303    */
1304   public static int putBigDecimal(byte[] bytes, int offset, BigDecimal val) {
1305     if (bytes == null) {
1306       return offset;
1307     }
1308 
1309     byte[] valueBytes = val.unscaledValue().toByteArray();
1310     byte[] result = new byte[valueBytes.length + SIZEOF_INT];
1311     offset = putInt(result, offset, val.scale());
1312     return putBytes(result, offset, valueBytes, 0, valueBytes.length);
1313   }
1314 
1315   /**
1316    * @param vint Integer to make a vint of.
1317    * @return Vint as bytes array.
1318    */
1319   public static byte [] vintToBytes(final long vint) {
1320     long i = vint;
1321     int size = WritableUtils.getVIntSize(i);
1322     byte [] result = new byte[size];
1323     int offset = 0;
1324     if (i >= -112 && i <= 127) {
1325       result[offset] = (byte) i;
1326       return result;
1327     }
1328 
1329     int len = -112;
1330     if (i < 0) {
1331       i ^= -1L; // take one's complement'
1332       len = -120;
1333     }
1334 
1335     long tmp = i;
1336     while (tmp != 0) {
1337       tmp = tmp >> 8;
1338       len--;
1339     }
1340 
1341     result[offset++] = (byte) len;
1342 
1343     len = (len < -120) ? -(len + 120) : -(len + 112);
1344 
1345     for (int idx = len; idx != 0; idx--) {
1346       int shiftbits = (idx - 1) * 8;
1347       long mask = 0xFFL << shiftbits;
1348       result[offset++] = (byte)((i & mask) >> shiftbits);
1349     }
1350     return result;
1351   }
1352 
1353   /**
1354    * @param buffer buffer to convert
1355    * @return vint bytes as an integer.
1356    */
1357   public static long bytesToVint(final byte [] buffer) {
1358     int offset = 0;
1359     byte firstByte = buffer[offset++];
1360     int len = WritableUtils.decodeVIntSize(firstByte);
1361     if (len == 1) {
1362       return firstByte;
1363     }
1364     long i = 0;
1365     for (int idx = 0; idx < len-1; idx++) {
1366       byte b = buffer[offset++];
1367       i = i << 8;
1368       i = i | (b & 0xFF);
1369     }
1370     return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
1371   }
1372 
1373   /**
1374    * Reads a zero-compressed encoded long from input buffer and returns it.
1375    * @param buffer Binary array
1376    * @param offset Offset into array at which vint begins.
1377    * @throws java.io.IOException e
1378    * @return deserialized long from buffer.
1379    * @deprecated Use {@link #readAsVLong()} instead.
1380    */
1381   @Deprecated
1382   public static long readVLong(final byte [] buffer, final int offset)
1383   throws IOException {
1384     return readAsVLong(buffer, offset);
1385   }
1386 
1387   /**
1388    * Reads a zero-compressed encoded long from input buffer and returns it.
1389    * @param buffer Binary array
1390    * @param offset Offset into array at which vint begins.
1391    * @return deserialized long from buffer.
1392    */
1393   public static long readAsVLong(final byte [] buffer, final int offset) {
1394     byte firstByte = buffer[offset];
1395     int len = WritableUtils.decodeVIntSize(firstByte);
1396     if (len == 1) {
1397       return firstByte;
1398     }
1399     long i = 0;
1400     for (int idx = 0; idx < len-1; idx++) {
1401       byte b = buffer[offset + 1 + idx];
1402       i = i << 8;
1403       i = i | (b & 0xFF);
1404     }
1405     return (WritableUtils.isNegativeVInt(firstByte) ? ~i : i);
1406   }
1407 
1408   /**
1409    * @param left left operand
1410    * @param right right operand
1411    * @return 0 if equal, < 0 if left is less than right, etc.
1412    */
1413   public static int compareTo(final byte [] left, final byte [] right) {
1414     return LexicographicalComparerHolder.BEST_COMPARER.
1415       compareTo(left, 0, left.length, right, 0, right.length);
1416   }
1417 
1418   /**
1419    * Lexicographically compare two arrays.
1420    *
1421    * @param buffer1 left operand
1422    * @param buffer2 right operand
1423    * @param offset1 Where to start comparing in the left buffer
1424    * @param offset2 Where to start comparing in the right buffer
1425    * @param length1 How much to compare from the left buffer
1426    * @param length2 How much to compare from the right buffer
1427    * @return 0 if equal, < 0 if left is less than right, etc.
1428    */
1429   public static int compareTo(byte[] buffer1, int offset1, int length1,
1430       byte[] buffer2, int offset2, int length2) {
1431     return LexicographicalComparerHolder.BEST_COMPARER.
1432       compareTo(buffer1, offset1, length1, buffer2, offset2, length2);
1433   }
1434 
1435   interface Comparer<T> {
1436     int compareTo(
1437       T buffer1, int offset1, int length1, T buffer2, int offset2, int length2
1438     );
1439   }
1440 
1441   @VisibleForTesting
1442   static Comparer<byte[]> lexicographicalComparerJavaImpl() {
1443     return LexicographicalComparerHolder.PureJavaComparer.INSTANCE;
1444   }
1445 
1446   /**
1447    * Provides a lexicographical comparer implementation; either a Java
1448    * implementation or a faster implementation based on {@link Unsafe}.
1449    *
1450    * <p>Uses reflection to gracefully fall back to the Java implementation if
1451    * {@code Unsafe} isn't available.
1452    */
1453   @VisibleForTesting
1454   static class LexicographicalComparerHolder {
1455     static final String UNSAFE_COMPARER_NAME =
1456         LexicographicalComparerHolder.class.getName() + "$UnsafeComparer";
1457 
1458     static final Comparer<byte[]> BEST_COMPARER = getBestComparer();
1459     /**
1460      * Returns the Unsafe-using Comparer, or falls back to the pure-Java
1461      * implementation if unable to do so.
1462      */
1463     static Comparer<byte[]> getBestComparer() {
1464       try {
1465         Class<?> theClass = Class.forName(UNSAFE_COMPARER_NAME);
1466 
1467         // yes, UnsafeComparer does implement Comparer<byte[]>
1468         @SuppressWarnings("unchecked")
1469         Comparer<byte[]> comparer =
1470           (Comparer<byte[]>) theClass.getEnumConstants()[0];
1471         return comparer;
1472       } catch (Throwable t) { // ensure we really catch *everything*
1473         return lexicographicalComparerJavaImpl();
1474       }
1475     }
1476 
1477     enum PureJavaComparer implements Comparer<byte[]> {
1478       INSTANCE;
1479 
1480       @Override
1481       public int compareTo(byte[] buffer1, int offset1, int length1,
1482           byte[] buffer2, int offset2, int length2) {
1483         // Short circuit equal case
1484         if (buffer1 == buffer2 &&
1485             offset1 == offset2 &&
1486             length1 == length2) {
1487           return 0;
1488         }
1489         // Bring WritableComparator code local
1490         int end1 = offset1 + length1;
1491         int end2 = offset2 + length2;
1492         for (int i = offset1, j = offset2; i < end1 && j < end2; i++, j++) {
1493           int a = (buffer1[i] & 0xff);
1494           int b = (buffer2[j] & 0xff);
1495           if (a != b) {
1496             return a - b;
1497           }
1498         }
1499         return length1 - length2;
1500       }
1501     }
1502 
1503     @VisibleForTesting
1504     enum UnsafeComparer implements Comparer<byte[]> {
1505       INSTANCE;
1506 
1507       static final Unsafe theUnsafe;
1508 
1509       /** The offset to the first element in a byte array. */
1510       static final int BYTE_ARRAY_BASE_OFFSET;
1511 
1512       static {
1513         theUnsafe = (Unsafe) AccessController.doPrivileged(
1514             new PrivilegedAction<Object>() {
1515               @Override
1516               public Object run() {
1517                 try {
1518                   Field f = Unsafe.class.getDeclaredField("theUnsafe");
1519                   f.setAccessible(true);
1520                   return f.get(null);
1521                 } catch (NoSuchFieldException e) {
1522                   // It doesn't matter what we throw;
1523                   // it's swallowed in getBestComparer().
1524                   throw new Error();
1525                 } catch (IllegalAccessException e) {
1526                   throw new Error();
1527                 }
1528               }
1529             });
1530 
1531         BYTE_ARRAY_BASE_OFFSET = theUnsafe.arrayBaseOffset(byte[].class);
1532 
1533         // sanity check - this should never fail
1534         if (theUnsafe.arrayIndexScale(byte[].class) != 1) {
1535           throw new AssertionError();
1536         }
1537       }
1538 
1539       static final boolean littleEndian =
1540         ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
1541 
1542       /**
1543        * Returns true if x1 is less than x2, when both values are treated as
1544        * unsigned long.
1545        * Both values are passed as is read by Unsafe. When platform is Little Endian, have to
1546        * convert to corresponding Big Endian value and then do compare. We do all writes in
1547        * Big Endian format.
1548        */
1549       static boolean lessThanUnsignedLong(long x1, long x2) {
1550         if (littleEndian) {
1551           x1 = Long.reverseBytes(x1);
1552           x2 = Long.reverseBytes(x2);
1553         }
1554         return (x1 + Long.MIN_VALUE) < (x2 + Long.MIN_VALUE);
1555       }
1556 
1557       /**
1558        * Returns true if x1 is less than x2, when both values are treated as
1559        * unsigned int.
1560        * Both values are passed as is read by Unsafe. When platform is Little Endian, have to
1561        * convert to corresponding Big Endian value and then do compare. We do all writes in
1562        * Big Endian format.
1563        */
1564       static boolean lessThanUnsignedInt(int x1, int x2) {
1565         if (littleEndian) {
1566           x1 = Integer.reverseBytes(x1);
1567           x2 = Integer.reverseBytes(x2);
1568         }
1569         return (x1 & 0xffffffffL) < (x2 & 0xffffffffL);
1570       }
1571 
1572       /**
1573        * Returns true if x1 is less than x2, when both values are treated as
1574        * unsigned short.
1575        * Both values are passed as is read by Unsafe. When platform is Little Endian, have to
1576        * convert to corresponding Big Endian value and then do compare. We do all writes in
1577        * Big Endian format.
1578        */
1579       static boolean lessThanUnsignedShort(short x1, short x2) {
1580         if (littleEndian) {
1581           x1 = Short.reverseBytes(x1);
1582           x2 = Short.reverseBytes(x2);
1583         }
1584         return (x1 & 0xffff) < (x2 & 0xffff);
1585       }
1586 
1587       /**
1588        * Checks if Unsafe is available
1589        * @return true, if available, false - otherwise
1590        */
1591       public static boolean isAvailable()
1592       {
1593         return theUnsafe != null;
1594       }
1595 
1596       /**
1597        * Lexicographically compare two arrays.
1598        *
1599        * @param buffer1 left operand
1600        * @param buffer2 right operand
1601        * @param offset1 Where to start comparing in the left buffer
1602        * @param offset2 Where to start comparing in the right buffer
1603        * @param length1 How much to compare from the left buffer
1604        * @param length2 How much to compare from the right buffer
1605        * @return 0 if equal, < 0 if left is less than right, etc.
1606        */
1607       @Override
1608       public int compareTo(byte[] buffer1, int offset1, int length1,
1609           byte[] buffer2, int offset2, int length2) {
1610 
1611         // Short circuit equal case
1612         if (buffer1 == buffer2 &&
1613             offset1 == offset2 &&
1614             length1 == length2) {
1615           return 0;
1616         }
1617         final int minLength = Math.min(length1, length2);
1618         final int minWords = minLength / SIZEOF_LONG;
1619         final long offset1Adj = offset1 + BYTE_ARRAY_BASE_OFFSET;
1620         final long offset2Adj = offset2 + BYTE_ARRAY_BASE_OFFSET;
1621 
1622         /*
1623          * Compare 8 bytes at a time. Benchmarking shows comparing 8 bytes at a
1624          * time is no slower than comparing 4 bytes at a time even on 32-bit.
1625          * On the other hand, it is substantially faster on 64-bit.
1626          */
1627         // This is the end offset of long parts.
1628         int j = minWords << 3; // Same as minWords * SIZEOF_LONG
1629         for (int i = 0; i < j; i += SIZEOF_LONG) {
1630           long lw = theUnsafe.getLong(buffer1, offset1Adj + (long) i);
1631           long rw = theUnsafe.getLong(buffer2, offset2Adj + (long) i);
1632           long diff = lw ^ rw;
1633           if (diff != 0) {
1634               return lessThanUnsignedLong(lw, rw) ? -1 : 1;
1635           }
1636         }
1637         int offset = j;
1638 
1639         if (minLength - offset >= SIZEOF_INT) {
1640           int il = theUnsafe.getInt(buffer1, offset1Adj + offset);
1641           int ir = theUnsafe.getInt(buffer2, offset2Adj + offset);
1642           if (il != ir) {
1643             return lessThanUnsignedInt(il, ir) ? -1: 1;
1644           }
1645           offset += SIZEOF_INT;
1646         }
1647         if (minLength - offset >= SIZEOF_SHORT) {
1648           short sl = theUnsafe.getShort(buffer1, offset1Adj + offset);
1649           short sr = theUnsafe.getShort(buffer2, offset2Adj + offset);
1650           if (sl != sr) {
1651             return lessThanUnsignedShort(sl, sr) ? -1: 1;
1652           }
1653           offset += SIZEOF_SHORT;
1654         }
1655         if (minLength - offset == 1) {
1656           int a = (buffer1[(int)(offset1 + offset)] & 0xff);
1657           int b = (buffer2[(int)(offset2 + offset)] & 0xff);
1658           if (a != b) {
1659             return a - b;
1660           }
1661         }
1662         return length1 - length2;
1663       }
1664     }
1665   }
1666 
1667   /**
1668    * @param left left operand
1669    * @param right right operand
1670    * @return True if equal
1671    */
1672   public static boolean equals(final byte [] left, final byte [] right) {
1673     // Could use Arrays.equals?
1674     //noinspection SimplifiableConditionalExpression
1675     if (left == right) return true;
1676     if (left == null || right == null) return false;
1677     if (left.length != right.length) return false;
1678     if (left.length == 0) return true;
1679 
1680     // Since we're often comparing adjacent sorted data,
1681     // it's usual to have equal arrays except for the very last byte
1682     // so check that first
1683     if (left[left.length - 1] != right[right.length - 1]) return false;
1684 
1685     return compareTo(left, right) == 0;
1686   }
1687 
1688   public static boolean equals(final byte[] left, int leftOffset, int leftLen,
1689                                final byte[] right, int rightOffset, int rightLen) {
1690     // short circuit case
1691     if (left == right &&
1692         leftOffset == rightOffset &&
1693         leftLen == rightLen) {
1694       return true;
1695     }
1696     // different lengths fast check
1697     if (leftLen != rightLen) {
1698       return false;
1699     }
1700     if (leftLen == 0) {
1701       return true;
1702     }
1703 
1704     // Since we're often comparing adjacent sorted data,
1705     // it's usual to have equal arrays except for the very last byte
1706     // so check that first
1707     if (left[leftOffset + leftLen - 1] != right[rightOffset + rightLen - 1]) return false;
1708 
1709     return LexicographicalComparerHolder.BEST_COMPARER.
1710       compareTo(left, leftOffset, leftLen, right, rightOffset, rightLen) == 0;
1711   }
1712 
1713 
1714   /**
1715    * @param a left operand
1716    * @param buf right operand
1717    * @return True if equal
1718    */
1719   public static boolean equals(byte[] a, ByteBuffer buf) {
1720     if (a == null) return buf == null;
1721     if (buf == null) return false;
1722     if (a.length != buf.remaining()) return false;
1723 
1724     // Thou shalt not modify the original byte buffer in what should be read only operations.
1725     ByteBuffer b = buf.duplicate();
1726     for (byte anA : a) {
1727       if (anA != b.get()) {
1728         return false;
1729       }
1730     }
1731     return true;
1732   }
1733 
1734 
1735   /**
1736    * Return true if the byte array on the right is a prefix of the byte
1737    * array on the left.
1738    */
1739   public static boolean startsWith(byte[] bytes, byte[] prefix) {
1740     return bytes != null && prefix != null &&
1741       bytes.length >= prefix.length &&
1742       LexicographicalComparerHolder.BEST_COMPARER.
1743         compareTo(bytes, 0, prefix.length, prefix, 0, prefix.length) == 0;
1744   }
1745 
1746   /**
1747    * @param b bytes to hash
1748    * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
1749    * passed in array.  This method is what {@link org.apache.hadoop.io.Text}
1750    * use calculating hash code.
1751    */
1752   public static int hashCode(final byte [] b) {
1753     return hashCode(b, b.length);
1754   }
1755 
1756   /**
1757    * @param b value
1758    * @param length length of the value
1759    * @return Runs {@link WritableComparator#hashBytes(byte[], int)} on the
1760    * passed in array.  This method is what {@link org.apache.hadoop.io.Text}
1761    * use calculating hash code.
1762    */
1763   public static int hashCode(final byte [] b, final int length) {
1764     return WritableComparator.hashBytes(b, length);
1765   }
1766 
1767   /**
1768    * @param b bytes to hash
1769    * @return A hash of <code>b</code> as an Integer that can be used as key in
1770    * Maps.
1771    */
1772   public static Integer mapKey(final byte [] b) {
1773     return hashCode(b);
1774   }
1775 
1776   /**
1777    * @param b bytes to hash
1778    * @param length length to hash
1779    * @return A hash of <code>b</code> as an Integer that can be used as key in
1780    * Maps.
1781    */
1782   public static Integer mapKey(final byte [] b, final int length) {
1783     return hashCode(b, length);
1784   }
1785 
1786   /**
1787    * @param a lower half
1788    * @param b upper half
1789    * @return New array that has a in lower half and b in upper half.
1790    */
1791   public static byte [] add(final byte [] a, final byte [] b) {
1792     return add(a, b, EMPTY_BYTE_ARRAY);
1793   }
1794 
1795   /**
1796    * @param a first third
1797    * @param b second third
1798    * @param c third third
1799    * @return New array made from a, b and c
1800    */
1801   public static byte [] add(final byte [] a, final byte [] b, final byte [] c) {
1802     byte [] result = new byte[a.length + b.length + c.length];
1803     System.arraycopy(a, 0, result, 0, a.length);
1804     System.arraycopy(b, 0, result, a.length, b.length);
1805     System.arraycopy(c, 0, result, a.length + b.length, c.length);
1806     return result;
1807   }
1808 
1809   /**
1810    * @param a array
1811    * @param length amount of bytes to grab
1812    * @return First <code>length</code> bytes from <code>a</code>
1813    */
1814   public static byte [] head(final byte [] a, final int length) {
1815     if (a.length < length) {
1816       return null;
1817     }
1818     byte [] result = new byte[length];
1819     System.arraycopy(a, 0, result, 0, length);
1820     return result;
1821   }
1822 
1823   /**
1824    * @param a array
1825    * @param length amount of bytes to snarf
1826    * @return Last <code>length</code> bytes from <code>a</code>
1827    */
1828   public static byte [] tail(final byte [] a, final int length) {
1829     if (a.length < length) {
1830       return null;
1831     }
1832     byte [] result = new byte[length];
1833     System.arraycopy(a, a.length - length, result, 0, length);
1834     return result;
1835   }
1836 
1837   /**
1838    * @param a array
1839    * @param length new array size
1840    * @return Value in <code>a</code> plus <code>length</code> prepended 0 bytes
1841    */
1842   public static byte [] padHead(final byte [] a, final int length) {
1843     byte [] padding = new byte[length];
1844     for (int i = 0; i < length; i++) {
1845       padding[i] = 0;
1846     }
1847     return add(padding,a);
1848   }
1849 
1850   /**
1851    * @param a array
1852    * @param length new array size
1853    * @return Value in <code>a</code> plus <code>length</code> appended 0 bytes
1854    */
1855   public static byte [] padTail(final byte [] a, final int length) {
1856     byte [] padding = new byte[length];
1857     for (int i = 0; i < length; i++) {
1858       padding[i] = 0;
1859     }
1860     return add(a,padding);
1861   }
1862 
1863   /**
1864    * Split passed range.  Expensive operation relatively.  Uses BigInteger math.
1865    * Useful splitting ranges for MapReduce jobs.
1866    * @param a Beginning of range
1867    * @param b End of range
1868    * @param num Number of times to split range.  Pass 1 if you want to split
1869    * the range in two; i.e. one split.
1870    * @return Array of dividing values
1871    */
1872   public static byte [][] split(final byte [] a, final byte [] b, final int num) {
1873     return split(a, b, false, num);
1874   }
1875 
1876   /**
1877    * Split passed range.  Expensive operation relatively.  Uses BigInteger math.
1878    * Useful splitting ranges for MapReduce jobs.
1879    * @param a Beginning of range
1880    * @param b End of range
1881    * @param inclusive Whether the end of range is prefix-inclusive or is
1882    * considered an exclusive boundary.  Automatic splits are generally exclusive
1883    * and manual splits with an explicit range utilize an inclusive end of range.
1884    * @param num Number of times to split range.  Pass 1 if you want to split
1885    * the range in two; i.e. one split.
1886    * @return Array of dividing values
1887    */
1888   public static byte[][] split(final byte[] a, final byte[] b,
1889       boolean inclusive, final int num) {
1890     byte[][] ret = new byte[num + 2][];
1891     int i = 0;
1892     Iterable<byte[]> iter = iterateOnSplits(a, b, inclusive, num);
1893     if (iter == null)
1894       return null;
1895     for (byte[] elem : iter) {
1896       ret[i++] = elem;
1897     }
1898     return ret;
1899   }
1900 
1901   /**
1902    * Iterate over keys within the passed range, splitting at an [a,b) boundary.
1903    */
1904   public static Iterable<byte[]> iterateOnSplits(final byte[] a,
1905       final byte[] b, final int num)
1906   {
1907     return iterateOnSplits(a, b, false, num);
1908   }
1909 
1910   /**
1911    * Iterate over keys within the passed range.
1912    */
1913   public static Iterable<byte[]> iterateOnSplits(
1914       final byte[] a, final byte[]b, boolean inclusive, final int num)
1915   {
1916     byte [] aPadded;
1917     byte [] bPadded;
1918     if (a.length < b.length) {
1919       aPadded = padTail(a, b.length - a.length);
1920       bPadded = b;
1921     } else if (b.length < a.length) {
1922       aPadded = a;
1923       bPadded = padTail(b, a.length - b.length);
1924     } else {
1925       aPadded = a;
1926       bPadded = b;
1927     }
1928     if (compareTo(aPadded,bPadded) >= 0) {
1929       throw new IllegalArgumentException("b <= a");
1930     }
1931     if (num <= 0) {
1932       throw new IllegalArgumentException("num cannot be <= 0");
1933     }
1934     byte [] prependHeader = {1, 0};
1935     final BigInteger startBI = new BigInteger(add(prependHeader, aPadded));
1936     final BigInteger stopBI = new BigInteger(add(prependHeader, bPadded));
1937     BigInteger diffBI = stopBI.subtract(startBI);
1938     if (inclusive) {
1939       diffBI = diffBI.add(BigInteger.ONE);
1940     }
1941     final BigInteger splitsBI = BigInteger.valueOf(num + 1);
1942     //when diffBI < splitBI, use an additional byte to increase diffBI
1943     if(diffBI.compareTo(splitsBI) < 0) {
1944       byte[] aPaddedAdditional = new byte[aPadded.length+1];
1945       byte[] bPaddedAdditional = new byte[bPadded.length+1];
1946       for (int i = 0; i < aPadded.length; i++){
1947         aPaddedAdditional[i] = aPadded[i];
1948       }
1949       for (int j = 0; j < bPadded.length; j++){
1950         bPaddedAdditional[j] = bPadded[j];
1951       }
1952       aPaddedAdditional[aPadded.length] = 0;
1953       bPaddedAdditional[bPadded.length] = 0;
1954       return iterateOnSplits(aPaddedAdditional, bPaddedAdditional, inclusive,  num);
1955     }
1956     final BigInteger intervalBI;
1957     try {
1958       intervalBI = diffBI.divide(splitsBI);
1959     } catch(Exception e) {
1960       LOG.error("Exception caught during division", e);
1961       return null;
1962     }
1963 
1964     final Iterator<byte[]> iterator = new Iterator<byte[]>() {
1965       private int i = -1;
1966 
1967       @Override
1968       public boolean hasNext() {
1969         return i < num+1;
1970       }
1971 
1972       @Override
1973       public byte[] next() {
1974         i++;
1975         if (i == 0) return a;
1976         if (i == num + 1) return b;
1977 
1978         BigInteger curBI = startBI.add(intervalBI.multiply(BigInteger.valueOf(i)));
1979         byte [] padded = curBI.toByteArray();
1980         if (padded[1] == 0)
1981           padded = tail(padded, padded.length - 2);
1982         else
1983           padded = tail(padded, padded.length - 1);
1984         return padded;
1985       }
1986 
1987       @Override
1988       public void remove() {
1989         throw new UnsupportedOperationException();
1990       }
1991 
1992     };
1993 
1994     return new Iterable<byte[]>() {
1995       @Override
1996       public Iterator<byte[]> iterator() {
1997         return iterator;
1998       }
1999     };
2000   }
2001 
2002   /**
2003    * @param bytes array to hash
2004    * @param offset offset to start from
2005    * @param length length to hash
2006    * */
2007   public static int hashCode(byte[] bytes, int offset, int length) {
2008     int hash = 1;
2009     for (int i = offset; i < offset + length; i++)
2010       hash = (31 * hash) + (int) bytes[i];
2011     return hash;
2012   }
2013 
2014   /**
2015    * @param t operands
2016    * @return Array of byte arrays made from passed array of Text
2017    */
2018   public static byte [][] toByteArrays(final String [] t) {
2019     byte [][] result = new byte[t.length][];
2020     for (int i = 0; i < t.length; i++) {
2021       result[i] = Bytes.toBytes(t[i]);
2022     }
2023     return result;
2024   }
2025 
2026   /**
2027    * @param t operands
2028    * @return Array of binary byte arrays made from passed array of binary strings
2029    */
2030   public static byte[][] toBinaryByteArrays(final String[] t) {
2031     byte[][] result = new byte[t.length][];
2032     for (int i = 0; i < t.length; i++) {
2033       result[i] = Bytes.toBytesBinary(t[i]);
2034     }
2035     return result;
2036   }
2037 
2038   /**
2039    * @param column operand
2040    * @return A byte array of a byte array where first and only entry is
2041    * <code>column</code>
2042    */
2043   public static byte [][] toByteArrays(final String column) {
2044     return toByteArrays(toBytes(column));
2045   }
2046 
2047   /**
2048    * @param column operand
2049    * @return A byte array of a byte array where first and only entry is
2050    * <code>column</code>
2051    */
2052   public static byte [][] toByteArrays(final byte [] column) {
2053     byte [][] result = new byte[1][];
2054     result[0] = column;
2055     return result;
2056   }
2057 
2058   /**
2059    * Binary search for keys in indexes using Bytes.BYTES_RAWCOMPARATOR
2060    *
2061    * @param arr array of byte arrays to search for
2062    * @param key the key you want to find
2063    * @param offset the offset in the key you want to find
2064    * @param length the length of the key
2065    * @return zero-based index of the key, if the key is present in the array.
2066    *         Otherwise, a value -(i + 1) such that the key is between arr[i -
2067    *         1] and arr[i] non-inclusively, where i is in [0, i], if we define
2068    *         arr[-1] = -Inf and arr[N] = Inf for an N-element array. The above
2069    *         means that this function can return 2N + 1 different values
2070    *         ranging from -(N + 1) to N - 1.
2071    */
2072   public static int binarySearch(byte[][] arr, byte[] key, int offset, int length) {
2073     return binarySearch(arr, key, offset, length, Bytes.BYTES_RAWCOMPARATOR);
2074   }
2075   /**
2076    * Binary search for keys in indexes.
2077    *
2078    * @param arr array of byte arrays to search for
2079    * @param key the key you want to find
2080    * @param offset the offset in the key you want to find
2081    * @param length the length of the key
2082    * @param comparator a comparator to compare.
2083    * @return zero-based index of the key, if the key is present in the array.
2084    *         Otherwise, a value -(i + 1) such that the key is between arr[i -
2085    *         1] and arr[i] non-inclusively, where i is in [0, i], if we define
2086    *         arr[-1] = -Inf and arr[N] = Inf for an N-element array. The above
2087    *         means that this function can return 2N + 1 different values
2088    *         ranging from -(N + 1) to N - 1.
2089    * @deprecated {@link Bytes#binarySearch(byte[][], byte[], int, int)}
2090    */
2091   @Deprecated
2092   public static int binarySearch(byte [][]arr, byte []key, int offset,
2093       int length, RawComparator<?> comparator) {
2094     if(comparator == null) {
2095       comparator = Bytes.BYTES_RAWCOMPARATOR;
2096     }
2097     int low = 0;
2098     int high = arr.length - 1;
2099 
2100     while (low <= high) {
2101       int mid = (low+high) >>> 1;
2102       // we have to compare in this order, because the comparator order
2103       // has special logic when the 'left side' is a special key.
2104       int cmp = comparator.compare(key, offset, length,
2105           arr[mid], 0, arr[mid].length);
2106       // key lives above the midpoint
2107       if (cmp > 0)
2108         low = mid + 1;
2109       // key lives below the midpoint
2110       else if (cmp < 0)
2111         high = mid - 1;
2112       // BAM. how often does this really happen?
2113       else
2114         return mid;
2115     }
2116     return - (low+1);
2117   }
2118 
2119   /**
2120    * Binary search for keys in indexes.
2121    *
2122    * @param arr array of byte arrays to search for
2123    * @param key the key you want to find
2124    * @param comparator a comparator to compare.
2125    * @return zero-based index of the key, if the key is present in the array.
2126    *         Otherwise, a value -(i + 1) such that the key is between arr[i -
2127    *         1] and arr[i] non-inclusively, where i is in [0, i], if we define
2128    *         arr[-1] = -Inf and arr[N] = Inf for an N-element array. The above
2129    *         means that this function can return 2N + 1 different values
2130    *         ranging from -(N + 1) to N - 1.
2131    * @return the index of the block
2132    * @deprecated Use {@link Bytes#binarySearch(byte[][], Cell, Comparator)}
2133    */
2134   @Deprecated
2135   public static int binarySearch(byte[][] arr, Cell key, RawComparator<Cell> comparator) {
2136     int low = 0;
2137     int high = arr.length - 1;
2138     KeyValue.KeyOnlyKeyValue r = new KeyValue.KeyOnlyKeyValue();
2139     while (low <= high) {
2140       int mid = (low+high) >>> 1;
2141       // we have to compare in this order, because the comparator order
2142       // has special logic when the 'left side' is a special key.
2143       r.setKey(arr[mid], 0, arr[mid].length);
2144       int cmp = comparator.compare(key, r);
2145       // key lives above the midpoint
2146       if (cmp > 0)
2147         low = mid + 1;
2148       // key lives below the midpoint
2149       else if (cmp < 0)
2150         high = mid - 1;
2151       // BAM. how often does this really happen?
2152       else
2153         return mid;
2154     }
2155     return - (low+1);
2156   }
2157 
2158   /**
2159    * Binary search for keys in indexes.
2160    *
2161    * @param arr array of byte arrays to search for
2162    * @param key the key you want to find
2163    * @param comparator a comparator to compare.
2164    * @return zero-based index of the key, if the key is present in the array.
2165    *         Otherwise, a value -(i + 1) such that the key is between arr[i -
2166    *         1] and arr[i] non-inclusively, where i is in [0, i], if we define
2167    *         arr[-1] = -Inf and arr[N] = Inf for an N-element array. The above
2168    *         means that this function can return 2N + 1 different values
2169    *         ranging from -(N + 1) to N - 1.
2170    * @return the index of the block
2171    */
2172   public static int binarySearch(byte[][] arr, Cell key, Comparator<Cell> comparator) {
2173     int low = 0;
2174     int high = arr.length - 1;
2175     KeyValue.KeyOnlyKeyValue r = new KeyValue.KeyOnlyKeyValue();
2176     while (low <= high) {
2177       int mid = (low+high) >>> 1;
2178       // we have to compare in this order, because the comparator order
2179       // has special logic when the 'left side' is a special key.
2180       r.setKey(arr[mid], 0, arr[mid].length);
2181       int cmp = comparator.compare(key, r);
2182       // key lives above the midpoint
2183       if (cmp > 0)
2184         low = mid + 1;
2185       // key lives below the midpoint
2186       else if (cmp < 0)
2187         high = mid - 1;
2188       // BAM. how often does this really happen?
2189       else
2190         return mid;
2191     }
2192     return - (low+1);
2193   }
2194 
2195   /**
2196    * Bytewise binary increment/deincrement of long contained in byte array
2197    * on given amount.
2198    *
2199    * @param value - array of bytes containing long (length <= SIZEOF_LONG)
2200    * @param amount value will be incremented on (deincremented if negative)
2201    * @return array of bytes containing incremented long (length == SIZEOF_LONG)
2202    */
2203   public static byte [] incrementBytes(byte[] value, long amount)
2204   {
2205     byte[] val = value;
2206     if (val.length < SIZEOF_LONG) {
2207       // Hopefully this doesn't happen too often.
2208       byte [] newvalue;
2209       if (val[0] < 0) {
2210         newvalue = new byte[]{-1, -1, -1, -1, -1, -1, -1, -1};
2211       } else {
2212         newvalue = new byte[SIZEOF_LONG];
2213       }
2214       System.arraycopy(val, 0, newvalue, newvalue.length - val.length,
2215         val.length);
2216       val = newvalue;
2217     } else if (val.length > SIZEOF_LONG) {
2218       throw new IllegalArgumentException("Increment Bytes - value too big: " +
2219         val.length);
2220     }
2221     if(amount == 0) return val;
2222     if(val[0] < 0){
2223       return binaryIncrementNeg(val, amount);
2224     }
2225     return binaryIncrementPos(val, amount);
2226   }
2227 
2228   /* increment/deincrement for positive value */
2229   private static byte [] binaryIncrementPos(byte [] value, long amount) {
2230     long amo = amount;
2231     int sign = 1;
2232     if (amount < 0) {
2233       amo = -amount;
2234       sign = -1;
2235     }
2236     for(int i=0;i<value.length;i++) {
2237       int cur = ((int)amo % 256) * sign;
2238       amo = (amo >> 8);
2239       int val = value[value.length-i-1] & 0x0ff;
2240       int total = val + cur;
2241       if(total > 255) {
2242         amo += sign;
2243         total %= 256;
2244       } else if (total < 0) {
2245         amo -= sign;
2246       }
2247       value[value.length-i-1] = (byte)total;
2248       if (amo == 0) return value;
2249     }
2250     return value;
2251   }
2252 
2253   /* increment/deincrement for negative value */
2254   private static byte [] binaryIncrementNeg(byte [] value, long amount) {
2255     long amo = amount;
2256     int sign = 1;
2257     if (amount < 0) {
2258       amo = -amount;
2259       sign = -1;
2260     }
2261     for(int i=0;i<value.length;i++) {
2262       int cur = ((int)amo % 256) * sign;
2263       amo = (amo >> 8);
2264       int val = ((~value[value.length-i-1]) & 0x0ff) + 1;
2265       int total = cur - val;
2266       if(total >= 0) {
2267         amo += sign;
2268       } else if (total < -256) {
2269         amo -= sign;
2270         total %= 256;
2271       }
2272       value[value.length-i-1] = (byte)total;
2273       if (amo == 0) return value;
2274     }
2275     return value;
2276   }
2277 
2278   /**
2279    * Writes a string as a fixed-size field, padded with zeros.
2280    */
2281   public static void writeStringFixedSize(final DataOutput out, String s,
2282       int size) throws IOException {
2283     byte[] b = toBytes(s);
2284     if (b.length > size) {
2285       throw new IOException("Trying to write " + b.length + " bytes (" +
2286           toStringBinary(b) + ") into a field of length " + size);
2287     }
2288 
2289     out.writeBytes(s);
2290     for (int i = 0; i < size - s.length(); ++i)
2291       out.writeByte(0);
2292   }
2293 
2294   /**
2295    * Reads a fixed-size field and interprets it as a string padded with zeros.
2296    */
2297   public static String readStringFixedSize(final DataInput in, int size)
2298       throws IOException {
2299     byte[] b = new byte[size];
2300     in.readFully(b);
2301     int n = b.length;
2302     while (n > 0 && b[n - 1] == 0)
2303       --n;
2304 
2305     return toString(b, 0, n);
2306   }
2307 
2308   /**
2309    * Copy the byte array given in parameter and return an instance
2310    * of a new byte array with the same length and the same content.
2311    * @param bytes the byte array to duplicate
2312    * @return a copy of the given byte array
2313    */
2314   public static byte [] copy(byte [] bytes) {
2315     if (bytes == null) return null;
2316     byte [] result = new byte[bytes.length];
2317     System.arraycopy(bytes, 0, result, 0, bytes.length);
2318     return result;
2319   }
2320 
2321   /**
2322    * Copy the byte array given in parameter and return an instance
2323    * of a new byte array with the same length and the same content.
2324    * @param bytes the byte array to copy from
2325    * @return a copy of the given designated byte array
2326    * @param offset
2327    * @param length
2328    */
2329   public static byte [] copy(byte [] bytes, final int offset, final int length) {
2330     if (bytes == null) return null;
2331     byte [] result = new byte[length];
2332     System.arraycopy(bytes, offset, result, 0, length);
2333     return result;
2334   }
2335 
2336   /**
2337    * Search sorted array "a" for byte "key". I can't remember if I wrote this or copied it from
2338    * somewhere. (mcorgan)
2339    * @param a Array to search. Entries must be sorted and unique.
2340    * @param fromIndex First index inclusive of "a" to include in the search.
2341    * @param toIndex Last index exclusive of "a" to include in the search.
2342    * @param key The byte to search for.
2343    * @return The index of key if found. If not found, return -(index + 1), where negative indicates
2344    *         "not found" and the "index + 1" handles the "-0" case.
2345    */
2346   public static int unsignedBinarySearch(byte[] a, int fromIndex, int toIndex, byte key) {
2347     int unsignedKey = key & 0xff;
2348     int low = fromIndex;
2349     int high = toIndex - 1;
2350 
2351     while (low <= high) {
2352       int mid = (low + high) >>> 1;
2353       int midVal = a[mid] & 0xff;
2354 
2355       if (midVal < unsignedKey) {
2356         low = mid + 1;
2357       } else if (midVal > unsignedKey) {
2358         high = mid - 1;
2359       } else {
2360         return mid; // key found
2361       }
2362     }
2363     return -(low + 1); // key not found.
2364   }
2365 
2366   /**
2367    * Treat the byte[] as an unsigned series of bytes, most significant bits first.  Start by adding
2368    * 1 to the rightmost bit/byte and carry over all overflows to the more significant bits/bytes.
2369    *
2370    * @param input The byte[] to increment.
2371    * @return The incremented copy of "in".  May be same length or 1 byte longer.
2372    */
2373   public static byte[] unsignedCopyAndIncrement(final byte[] input) {
2374     byte[] copy = copy(input);
2375     if (copy == null) {
2376       throw new IllegalArgumentException("cannot increment null array");
2377     }
2378     for (int i = copy.length - 1; i >= 0; --i) {
2379       if (copy[i] == -1) {// -1 is all 1-bits, which is the unsigned maximum
2380         copy[i] = 0;
2381       } else {
2382         ++copy[i];
2383         return copy;
2384       }
2385     }
2386     // we maxed out the array
2387     byte[] out = new byte[copy.length + 1];
2388     out[0] = 1;
2389     System.arraycopy(copy, 0, out, 1, copy.length);
2390     return out;
2391   }
2392 
2393   public static boolean equals(List<byte[]> a, List<byte[]> b) {
2394     if (a == null) {
2395       if (b == null) {
2396         return true;
2397       }
2398       return false;
2399     }
2400     if (b == null) {
2401       return false;
2402     }
2403     if (a.size() != b.size()) {
2404       return false;
2405     }
2406     for (int i = 0; i < a.size(); ++i) {
2407       if (!Bytes.equals(a.get(i), b.get(i))) {
2408         return false;
2409       }
2410     }
2411     return true;
2412   }
2413 
2414   public static boolean isSorted(Collection<byte[]> arrays) {
2415     byte[] previous = new byte[0];
2416     for (byte[] array : IterableUtils.nullSafe(arrays)) {
2417       if (Bytes.compareTo(previous, array) > 0) {
2418         return false;
2419       }
2420       previous = array;
2421     }
2422     return true;
2423   }
2424 
2425   public static List<byte[]> getUtf8ByteArrays(List<String> strings) {
2426     List<byte[]> byteArrays = Lists.newArrayListWithCapacity(CollectionUtils.nullSafeSize(strings));
2427     for (String s : IterableUtils.nullSafe(strings)) {
2428       byteArrays.add(Bytes.toBytes(s));
2429     }
2430     return byteArrays;
2431   }
2432 
2433   /**
2434    * Returns the index of the first appearance of the value {@code target} in
2435    * {@code array}.
2436    *
2437    * @param array an array of {@code byte} values, possibly empty
2438    * @param target a primitive {@code byte} value
2439    * @return the least index {@code i} for which {@code array[i] == target}, or
2440    *     {@code -1} if no such index exists.
2441    */
2442   public static int indexOf(byte[] array, byte target) {
2443     for (int i = 0; i < array.length; i++) {
2444       if (array[i] == target) {
2445         return i;
2446       }
2447     }
2448     return -1;
2449   }
2450 
2451   /**
2452    * Returns the start position of the first occurrence of the specified {@code
2453    * target} within {@code array}, or {@code -1} if there is no such occurrence.
2454    *
2455    * <p>More formally, returns the lowest index {@code i} such that {@code
2456    * java.util.Arrays.copyOfRange(array, i, i + target.length)} contains exactly
2457    * the same elements as {@code target}.
2458    *
2459    * @param array the array to search for the sequence {@code target}
2460    * @param target the array to search for as a sub-sequence of {@code array}
2461    */
2462   public static int indexOf(byte[] array, byte[] target) {
2463     checkNotNull(array, "array");
2464     checkNotNull(target, "target");
2465     if (target.length == 0) {
2466       return 0;
2467     }
2468 
2469     outer:
2470     for (int i = 0; i < array.length - target.length + 1; i++) {
2471       for (int j = 0; j < target.length; j++) {
2472         if (array[i + j] != target[j]) {
2473           continue outer;
2474         }
2475       }
2476       return i;
2477     }
2478     return -1;
2479   }
2480 
2481   /**
2482    * @param array an array of {@code byte} values, possibly empty
2483    * @param target a primitive {@code byte} value
2484    * @return {@code true} if {@code target} is present as an element anywhere in {@code array}.
2485    */
2486   public static boolean contains(byte[] array, byte target) {
2487     return indexOf(array, target) > -1;
2488   }
2489 
2490   /**
2491    * @param array an array of {@code byte} values, possibly empty
2492    * @param target an array of {@code byte}
2493    * @return {@code true} if {@code target} is present anywhere in {@code array}
2494    */
2495   public static boolean contains(byte[] array, byte[] target) {
2496     return indexOf(array, target) > -1;
2497   }
2498 
2499   /**
2500    * Fill given array with zeros.
2501    * @param b array which needs to be filled with zeros
2502    */
2503   public static void zero(byte[] b) {
2504     zero(b, 0, b.length);
2505   }
2506 
2507   /**
2508    * Fill given array with zeros at the specified position.
2509    * @param b
2510    * @param offset
2511    * @param length
2512    */
2513   public static void zero(byte[] b, int offset, int length) {
2514     checkPositionIndex(offset, b.length, "offset");
2515     checkArgument(length > 0, "length must be greater than 0");
2516     checkPositionIndex(offset + length, b.length, "offset + length");
2517     Arrays.fill(b, offset, offset + length, (byte) 0);
2518   }
2519 
2520   private static final SecureRandom RNG = new SecureRandom();
2521 
2522   /**
2523    * Fill given array with random bytes.
2524    * @param b array which needs to be filled with random bytes
2525    */
2526   public static void random(byte[] b) {
2527     RNG.nextBytes(b);
2528   }
2529 
2530   /**
2531    * Fill given array with random bytes at the specified position.
2532    * @param b
2533    * @param offset
2534    * @param length
2535    */
2536   public static void random(byte[] b, int offset, int length) {
2537     checkPositionIndex(offset, b.length, "offset");
2538     checkArgument(length > 0, "length must be greater than 0");
2539     checkPositionIndex(offset + length, b.length, "offset + length");
2540     byte[] buf = new byte[length];
2541     RNG.nextBytes(buf);
2542     System.arraycopy(buf, 0, b, offset, length);
2543   }
2544 
2545   /**
2546    * Create a max byte array with the specified max byte count
2547    * @param maxByteCount the length of returned byte array
2548    * @return the created max byte array
2549    */
2550   public static byte[] createMaxByteArray(int maxByteCount) {
2551     byte[] maxByteArray = new byte[maxByteCount];
2552     for (int i = 0; i < maxByteArray.length; i++) {
2553       maxByteArray[i] = (byte) 0xff;
2554     }
2555     return maxByteArray;
2556   }
2557 
2558   /**
2559    * Create a byte array which is multiple given bytes
2560    * @param srcBytes
2561    * @param multiNum
2562    * @return byte array
2563    */
2564   public static byte[] multiple(byte[] srcBytes, int multiNum) {
2565     if (multiNum <= 0) {
2566       return new byte[0];
2567     }
2568     byte[] result = new byte[srcBytes.length * multiNum];
2569     for (int i = 0; i < multiNum; i++) {
2570       System.arraycopy(srcBytes, 0, result, i * srcBytes.length,
2571         srcBytes.length);
2572     }
2573     return result;
2574   }
2575 
2576   /**
2577    * Convert a byte array into a hex string
2578    * @param b
2579    */
2580   public static String toHex(byte[] b) {
2581     checkArgument(b.length > 0, "length must be greater than 0");
2582     return String.format("%x", new BigInteger(1, b));
2583   }
2584 
2585   /**
2586    * Create a byte array from a string of hash digits. The length of the
2587    * string must be a multiple of 2
2588    * @param hex
2589    */
2590   public static byte[] fromHex(String hex) {
2591     checkArgument(hex.length() > 0, "length must be greater than 0");
2592     checkArgument(hex.length() % 2 == 0, "length must be a multiple of 2");
2593     // Make sure letters are upper case
2594     hex = hex.toUpperCase();
2595     byte[] b = new byte[hex.length() / 2];
2596     for (int i = 0; i < b.length; i++) {
2597       b[i] = (byte)((toBinaryFromHex((byte)hex.charAt(2 * i)) << 4) +
2598         toBinaryFromHex((byte)hex.charAt((2 * i + 1))));
2599     }
2600     return b;
2601   }
2602 
2603 }