001/*
002 * Licensed to the Apache Software Foundation (ASF) under one
003 * or more contributor license agreements.  See the NOTICE file
004 * distributed with this work for additional information
005 * regarding copyright ownership.  The ASF licenses this file
006 * to you under the Apache License, Version 2.0 (the
007 * "License"); you may not use this file except in compliance
008 * with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 */
018package org.apache.hadoop.hbase.util;
019
020import static org.apache.hadoop.hbase.util.Order.ASCENDING;
021import static org.apache.hadoop.hbase.util.Order.DESCENDING;
022
023import java.math.BigDecimal;
024import java.math.MathContext;
025import java.math.RoundingMode;
026import java.nio.charset.Charset;
027import org.apache.yetus.audience.InterfaceAudience;
028
029/**
030 * Utility class that handles ordered byte arrays. That is, unlike {@link Bytes}, these methods
031 * produce byte arrays which maintain the sort order of the original values.
032 * <h3>Encoding Format summary</h3>
033 * <p>
034 * Each value is encoded as one or more bytes. The first byte of the encoding, its meaning, and a
035 * terse description of the bytes that follow is given by the following table:
036 * </p>
037 * <table summary="Encodings">
038 * <tr>
039 * <th>Content Type</th>
040 * <th>Encoding</th>
041 * </tr>
042 * <tr>
043 * <td>NULL</td>
044 * <td>0x05</td>
045 * </tr>
046 * <tr>
047 * <td>negative infinity</td>
048 * <td>0x07</td>
049 * </tr>
050 * <tr>
051 * <td>negative large</td>
052 * <td>0x08, ~E, ~M</td>
053 * </tr>
054 * <tr>
055 * <td>negative medium</td>
056 * <td>0x13-E, ~M</td>
057 * </tr>
058 * <tr>
059 * <td>negative small</td>
060 * <td>0x14, -E, ~M</td>
061 * </tr>
062 * <tr>
063 * <td>zero</td>
064 * <td>0x15</td>
065 * </tr>
066 * <tr>
067 * <td>positive small</td>
068 * <td>0x16, ~-E, M</td>
069 * </tr>
070 * <tr>
071 * <td>positive medium</td>
072 * <td>0x17+E, M</td>
073 * </tr>
074 * <tr>
075 * <td>positive large</td>
076 * <td>0x22, E, M</td>
077 * </tr>
078 * <tr>
079 * <td>positive infinity</td>
080 * <td>0x23</td>
081 * </tr>
082 * <tr>
083 * <td>NaN</td>
084 * <td>0x25</td>
085 * </tr>
086 * <tr>
087 * <td>fixed-length 32-bit integer</td>
088 * <td>0x27, I</td>
089 * </tr>
090 * <tr>
091 * <td>fixed-length 64-bit integer</td>
092 * <td>0x28, I</td>
093 * </tr>
094 * <tr>
095 * <td>fixed-length 8-bit integer</td>
096 * <td>0x29</td>
097 * </tr>
098 * <tr>
099 * <td>fixed-length 16-bit integer</td>
100 * <td>0x2a</td>
101 * </tr>
102 * <tr>
103 * <td>fixed-length 32-bit float</td>
104 * <td>0x30, F</td>
105 * </tr>
106 * <tr>
107 * <td>fixed-length 64-bit float</td>
108 * <td>0x31, F</td>
109 * </tr>
110 * <tr>
111 * <td>TEXT</td>
112 * <td>0x33, T</td>
113 * </tr>
114 * <tr>
115 * <td>variable length BLOB</td>
116 * <td>0x35, B</td>
117 * </tr>
118 * <tr>
119 * <td>byte-for-byte BLOB</td>
120 * <td>0x36, X</td>
121 * </tr>
122 * </table>
123 * <h3>Null Encoding</h3>
124 * <p>
125 * Each value that is a NULL encodes as a single byte of 0x05. Since every other value encoding
126 * begins with a byte greater than 0x05, this forces NULL values to sort first.
127 * </p>
128 * <h3>Text Encoding</h3>
129 * <p>
130 * Each text value begins with a single byte of 0x33 and ends with a single byte of 0x00. There are
131 * zero or more intervening bytes that encode the text value. The intervening bytes are chosen so
132 * that the encoding will sort in the desired collating order. The intervening bytes may not contain
133 * a 0x00 character; the only 0x00 byte allowed in a text encoding is the final byte.
134 * </p>
135 * <p>
136 * The text encoding ends in 0x00 in order to ensure that when there are two strings where one is a
137 * prefix of the other that the shorter string will sort first.
138 * </p>
139 * <h3>Binary Encoding</h3>
140 * <p>
141 * There are two encoding strategies for binary fields, referred to as "BlobVar" and "BlobCopy".
142 * BlobVar is less efficient in both space and encoding time. It has no limitations on the range of
143 * encoded values. BlobCopy is a byte-for-byte copy of the input data followed by a termination
144 * byte. It is extremely fast to encode and decode. It carries the restriction of not allowing a
145 * 0x00 value in the input byte[] as this value is used as the termination byte.
146 * </p>
147 * <h4>BlobVar</h4>
148 * <p>
149 * "BlobVar" encodes the input byte[] in a manner similar to a variable length integer encoding. As
150 * with the other {@code OrderedBytes} encodings, the first encoded byte is used to indicate what
151 * kind of value follows. This header byte is 0x37 for BlobVar encoded values. As with the
152 * traditional varint encoding, the most significant bit of each subsequent encoded {@code byte} is
153 * used as a continuation marker. The 7 remaining bits contain the 7 most significant bits of the
154 * first unencoded byte. The next encoded byte starts with a continuation marker in the MSB. The
155 * least significant bit from the first unencoded byte follows, and the remaining 6 bits contain the
156 * 6 MSBs of the second unencoded byte. The encoding continues, encoding 7 bytes on to 8 encoded
157 * bytes. The MSB of the final encoded byte contains a termination marker rather than a continuation
158 * marker, and any remaining bits from the final input byte. Any trailing bits in the final encoded
159 * byte are zeros.
160 * </p>
161 * <h4>BlobCopy</h4>
162 * <p>
163 * "BlobCopy" is a simple byte-for-byte copy of the input data. It uses 0x38 as the header byte, and
164 * is terminated by 0x00 in the DESCENDING case. This alternative encoding is faster and more
165 * space-efficient, but it cannot accept values containing a 0x00 byte in DESCENDING order.
166 * </p>
167 * <h3>Variable-length Numeric Encoding</h3>
168 * <p>
169 * Numeric values must be coded so as to sort in numeric order. We assume that numeric values can be
170 * both integer and floating point values. Clients must be careful to use inspection methods for
171 * encoded values (such as {@link #isNumericInfinite(PositionedByteRange)} and
172 * {@link #isNumericNaN(PositionedByteRange)} to protect against decoding values into object which
173 * do not support these numeric concepts (such as {@link Long} and {@link BigDecimal}).
174 * </p>
175 * <p>
176 * Simplest cases first: If the numeric value is a NaN, then the encoding is a single byte of 0x25.
177 * This causes NaN values to sort after every other numeric value.
178 * </p>
179 * <p>
180 * If the numeric value is a negative infinity then the encoding is a single byte of 0x07. Since
181 * every other numeric value except NaN has a larger initial byte, this encoding ensures that
182 * negative infinity will sort prior to every other numeric value other than NaN.
183 * </p>
184 * <p>
185 * If the numeric value is a positive infinity then the encoding is a single byte of 0x23. Every
186 * other numeric value encoding begins with a smaller byte, ensuring that positive infinity always
187 * sorts last among numeric values. 0x23 is also smaller than 0x33, the initial byte of a text
188 * value, ensuring that every numeric value sorts before every text value.
189 * </p>
190 * <p>
191 * If the numeric value is exactly zero then it is encoded as a single byte of 0x15. Finite negative
192 * values will have initial bytes of 0x08 through 0x14 and finite positive values will have initial
193 * bytes of 0x16 through 0x22.
194 * </p>
195 * <p>
196 * For all numeric values, we compute a mantissa M and an exponent E. The mantissa is a base-100
197 * representation of the value. The exponent E determines where to put the decimal point.
198 * </p>
199 * <p>
200 * Each centimal digit of the mantissa is stored in a byte. If the value of the centimal digit is X
201 * (hence X&ge;0 and X&le;99) then the byte value will be 2*X+1 for every byte of the mantissa,
202 * except for the last byte which will be 2*X+0. The mantissa must be the minimum number of bytes
203 * necessary to represent the value; trailing X==0 digits are omitted. This means that the mantissa
204 * will never contain a byte with the value 0x00.
205 * </p>
206 * <p>
207 * If we assume all digits of the mantissa occur to the right of the decimal point, then the
208 * exponent E is the power of one hundred by which one must multiply the mantissa to recover the
209 * original value.
210 * </p>
211 * <p>
212 * Values are classified as large, medium, or small according to the value of E. If E is 11 or more,
213 * the value is large. For E between 0 and 10, the value is medium. For E less than zero, the value
214 * is small.
215 * </p>
216 * <p>
217 * Large positive values are encoded as a single byte 0x22 followed by E as a varint and then M.
218 * Medium positive values are a single byte of 0x17+E followed by M. Small positive values are
219 * encoded as a single byte 0x16 followed by the ones-complement of the varint for -E followed by M.
220 * </p>
221 * <p>
222 * Small negative values are encoded as a single byte 0x14 followed by -E as a varint and then the
223 * ones-complement of M. Medium negative values are encoded as a byte 0x13-E followed by the
224 * ones-complement of M. Large negative values consist of the single byte 0x08 followed by the
225 * ones-complement of the varint encoding of E followed by the ones-complement of M.
226 * </p>
227 * <h3>Fixed-length Integer Encoding</h3>
228 * <p>
229 * All 4-byte integers are serialized to a 5-byte, fixed-width, sortable byte format. All 8-byte
230 * integers are serialized to the equivelant 9-byte format. Serialization is performed by writing a
231 * header byte, inverting the integer sign bit and writing the resulting bytes to the byte array in
232 * big endian order.
233 * </p>
234 * <h3>Fixed-length Floating Point Encoding</h3>
235 * <p>
236 * 32-bit and 64-bit floating point numbers are encoded to a 5-byte and 9-byte encoding format,
237 * respectively. The format is identical, save for the precision respected in each step of the
238 * operation.
239 * <p>
240 * This format ensures the following total ordering of floating point values:
241 * Float.NEGATIVE_INFINITY &lt; -Float.MAX_VALUE &lt; ... &lt; -Float.MIN_VALUE &lt; -0.0 &lt; +0.0;
242 * &lt; Float.MIN_VALUE &lt; ... &lt; Float.MAX_VALUE &lt; Float.POSITIVE_INFINITY &lt; Float.NaN
243 * </p>
244 * <p>
245 * Floating point numbers are encoded as specified in IEEE 754. A 32-bit single precision float
246 * consists of a sign bit, 8-bit unsigned exponent encoded in offset-127 notation, and a 23-bit
247 * significand. The format is described further in the
248 * <a href="http://en.wikipedia.org/wiki/Single_precision"> Single Precision Floating Point
249 * Wikipedia page</a>
250 * </p>
251 * <p>
252 * The value of a normal float is -1 <sup>sign bit</sup> &times; 2<sup>exponent - 127</sup> &times;
253 * 1.significand
254 * </p>
255 * <p>
256 * The IEE754 floating point format already preserves sort ordering for positive floating point
257 * numbers when the raw bytes are compared in most significant byte order. This is discussed further
258 * at <a href= "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm">
259 * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm</a>
260 * </p>
261 * <p>
262 * Thus, we need only ensure that negative numbers sort in the the exact opposite order as positive
263 * numbers (so that say, negative infinity is less than negative 1), and that all negative numbers
264 * compare less than any positive number. To accomplish this, we invert the sign bit of all floating
265 * point numbers, and we also invert the exponent and significand bits if the floating point number
266 * was negative.
267 * </p>
268 * <p>
269 * More specifically, we first store the floating point bits into a 32-bit int {@code j} using
270 * {@link Float#floatToIntBits}. This method collapses all NaNs into a single, canonical NaN value
271 * but otherwise leaves the bits unchanged. We then compute
272 * </p>
273 *
274 * <pre>
275 * j &circ;= (j &gt;&gt; (Integer.SIZE - 1)) | Integer.MIN_SIZE
276 * </pre>
277 * <p>
278 * which inverts the sign bit and XOR's all other bits with the sign bit itself. Comparing the raw
279 * bytes of {@code j} in most significant byte order is equivalent to performing a single precision
280 * floating point comparison on the underlying bits (ignoring NaN comparisons, as NaNs don't compare
281 * equal to anything when performing floating point comparisons).
282 * </p>
283 * <p>
284 * The resulting integer is then converted into a byte array by serializing the integer one byte at
285 * a time in most significant byte order. The serialized integer is prefixed by a single header
286 * byte. All serialized values are 5 bytes in length.
287 * </p>
288 * <p>
289 * {@code OrderedBytes} encodings are heavily influenced by the
290 * <a href="http://sqlite.org/src4/doc/trunk/www/key_encoding.wiki">SQLite4 Key Encoding</a>. Slight
291 * deviations are make in the interest of order correctness and user extensibility. Fixed-width
292 * {@code Long} and {@link Double} encodings are based on implementations from the now defunct
293 * Orderly library.
294 * </p>
295 */
296@InterfaceAudience.Public
297public class OrderedBytes {
298
299  /*
300   * These constants define header bytes used to identify encoded values. Note that the values here
301   * are not exhaustive as the Numeric format encodes portions of its value within the header byte.
302   * The values listed here are directly applied to persisted data -- DO NOT modify the values
303   * specified here. Instead, gaps are placed intentionally between values so that new
304   * implementations can be inserted into the total ordering enforced here.
305   */
306  private static final byte NULL = 0x05;
307  // room for 1 expansion type
308  private static final byte NEG_INF = 0x07;
309  private static final byte NEG_LARGE = 0x08;
310  private static final byte NEG_MED_MIN = 0x09;
311  private static final byte NEG_MED_MAX = 0x13;
312  private static final byte NEG_SMALL = 0x14;
313  private static final byte ZERO = 0x15;
314  private static final byte POS_SMALL = 0x16;
315  private static final byte POS_MED_MIN = 0x17;
316  private static final byte POS_MED_MAX = 0x21;
317  private static final byte POS_LARGE = 0x22;
318  private static final byte POS_INF = 0x23;
319  // room for 2 expansion type
320  private static final byte NAN = 0x26;
321  // room for 2 expansion types
322  private static final byte FIXED_INT8 = 0x29;
323  private static final byte FIXED_INT16 = 0x2a;
324  private static final byte FIXED_INT32 = 0x2b;
325  private static final byte FIXED_INT64 = 0x2c;
326  // room for 3 expansion types
327  private static final byte FIXED_FLOAT32 = 0x30;
328  private static final byte FIXED_FLOAT64 = 0x31;
329  // room for 2 expansion type
330  private static final byte TEXT = 0x34;
331  // room for 2 expansion type
332  private static final byte BLOB_VAR = 0x37;
333  private static final byte BLOB_COPY = 0x38;
334
335  /*
336   * The following constant values are used by encoding implementations
337   */
338
339  public static final Charset UTF8 = Charset.forName("UTF-8");
340  private static final byte TERM = 0x00;
341
342  /**
343   * Max precision guaranteed to fit into a {@code long}.
344   */
345  public static final int MAX_PRECISION = 31;
346
347  /**
348   * The context used to normalize {@link BigDecimal} values.
349   */
350  public static final MathContext DEFAULT_MATH_CONTEXT =
351    new MathContext(MAX_PRECISION, RoundingMode.HALF_UP);
352
353  /**
354   * Creates the standard exception when the encoded header byte is unexpected for the decoding
355   * context.
356   * @param header value used in error message.
357   */
358  private static IllegalArgumentException unexpectedHeader(byte header) {
359    throw new IllegalArgumentException(
360      "unexpected value in first byte: 0x" + Long.toHexString(header));
361  }
362
363  /**
364   * Perform unsigned comparison between two long values. Conforms to the same interface as
365   * {@link org.apache.hadoop.hbase.CellComparator}.
366   */
367  private static int unsignedCmp(long x1, long x2) {
368    int cmp;
369    if ((cmp = (x1 < x2 ? -1 : (x1 == x2 ? 0 : 1))) == 0) return 0;
370    // invert the result when either value is negative
371    if ((x1 < 0) != (x2 < 0)) return -cmp;
372    return cmp;
373  }
374
375  /**
376   * Write a 32-bit unsigned integer to {@code dst} as 4 big-endian bytes.
377   * @return number of bytes written.
378   */
379  private static int putUint32(PositionedByteRange dst, int val) {
380    dst.put((byte) (val >>> 24)).put((byte) (val >>> 16)).put((byte) (val >>> 8)).put((byte) val);
381    return 4;
382  }
383
384  /**
385   * Encode an unsigned 64-bit unsigned integer {@code val} into {@code dst}.
386   * @param dst  The destination to which encoded bytes are written.
387   * @param val  The value to write.
388   * @param comp Compliment the encoded value when {@code comp} is true.
389   * @return number of bytes written.
390   */
391  static int putVaruint64(PositionedByteRange dst, long val, boolean comp) {
392    int w, y, len = 0;
393    final int offset = dst.getOffset(), start = dst.getPosition();
394    byte[] a = dst.getBytes();
395    Order ord = comp ? DESCENDING : ASCENDING;
396    if (-1 == unsignedCmp(val, 241L)) {
397      dst.put((byte) val);
398      len = dst.getPosition() - start;
399      ord.apply(a, offset + start, len);
400      return len;
401    }
402    if (-1 == unsignedCmp(val, 2288L)) {
403      y = (int) (val - 240);
404      dst.put((byte) (y / 256 + 241)).put((byte) (y % 256));
405      len = dst.getPosition() - start;
406      ord.apply(a, offset + start, len);
407      return len;
408    }
409    if (-1 == unsignedCmp(val, 67824L)) {
410      y = (int) (val - 2288);
411      dst.put((byte) 249).put((byte) (y / 256)).put((byte) (y % 256));
412      len = dst.getPosition() - start;
413      ord.apply(a, offset + start, len);
414      return len;
415    }
416    y = (int) val;
417    w = (int) (val >>> 32);
418    if (w == 0) {
419      if (-1 == unsignedCmp(y, 16777216L)) {
420        dst.put((byte) 250).put((byte) (y >>> 16)).put((byte) (y >>> 8)).put((byte) y);
421        len = dst.getPosition() - start;
422        ord.apply(a, offset + start, len);
423        return len;
424      }
425      dst.put((byte) 251);
426      putUint32(dst, y);
427      len = dst.getPosition() - start;
428      ord.apply(a, offset + start, len);
429      return len;
430    }
431    if (-1 == unsignedCmp(w, 256L)) {
432      dst.put((byte) 252).put((byte) w);
433      putUint32(dst, y);
434      len = dst.getPosition() - start;
435      ord.apply(a, offset + start, len);
436      return len;
437    }
438    if (-1 == unsignedCmp(w, 65536L)) {
439      dst.put((byte) 253).put((byte) (w >>> 8)).put((byte) w);
440      putUint32(dst, y);
441      len = dst.getPosition() - start;
442      ord.apply(a, offset + start, len);
443      return len;
444    }
445    if (-1 == unsignedCmp(w, 16777216L)) {
446      dst.put((byte) 254).put((byte) (w >>> 16)).put((byte) (w >>> 8)).put((byte) w);
447      putUint32(dst, y);
448      len = dst.getPosition() - start;
449      ord.apply(a, offset + start, len);
450      return len;
451    }
452    dst.put((byte) 255);
453    putUint32(dst, w);
454    putUint32(dst, y);
455    len = dst.getPosition() - start;
456    ord.apply(a, offset + start, len);
457    return len;
458  }
459
460  /**
461   * Inspect {@code src} for an encoded varuint64 for its length in bytes. Preserves the state of
462   * {@code src}.
463   * @param src  source buffer
464   * @param comp if true, parse the compliment of the value.
465   * @return the number of bytes consumed by this value.
466   */
467  static int lengthVaruint64(PositionedByteRange src, boolean comp) {
468    int a0 = (comp ? DESCENDING : ASCENDING).apply(src.peek()) & 0xff;
469    if (a0 <= 240) return 1;
470    if (a0 <= 248) return 2;
471    if (a0 == 249) return 3;
472    if (a0 == 250) return 4;
473    if (a0 == 251) return 5;
474    if (a0 == 252) return 6;
475    if (a0 == 253) return 7;
476    if (a0 == 254) return 8;
477    if (a0 == 255) return 9;
478    throw unexpectedHeader(src.peek());
479  }
480
481  /**
482   * Skip {@code src} over the encoded varuint64.
483   * @param src source buffer
484   * @param cmp if true, parse the compliment of the value.
485   * @return the number of bytes skipped.
486   */
487  static int skipVaruint64(PositionedByteRange src, boolean cmp) {
488    final int len = lengthVaruint64(src, cmp);
489    src.setPosition(src.getPosition() + len);
490    return len;
491  }
492
493  /**
494   * Decode a sequence of bytes in {@code src} as a varuint64. Compliment the encoded value when
495   * {@code comp} is true.
496   * @return the decoded value.
497   */
498  static long getVaruint64(PositionedByteRange src, boolean comp) {
499    assert src.getRemaining() >= lengthVaruint64(src, comp);
500    final long ret;
501    Order ord = comp ? DESCENDING : ASCENDING;
502    byte x = src.get();
503    final int a0 = ord.apply(x) & 0xff, a1, a2, a3, a4, a5, a6, a7, a8;
504    if (-1 == unsignedCmp(a0, 241)) {
505      return a0;
506    }
507    x = src.get();
508    a1 = ord.apply(x) & 0xff;
509    if (-1 == unsignedCmp(a0, 249)) {
510      return (a0 - 241L) * 256 + a1 + 240;
511    }
512    x = src.get();
513    a2 = ord.apply(x) & 0xff;
514    if (a0 == 249) {
515      return 2288L + 256 * a1 + a2;
516    }
517    x = src.get();
518    a3 = ord.apply(x) & 0xff;
519    if (a0 == 250) {
520      return ((long) a1 << 16L) | (a2 << 8) | a3;
521    }
522    x = src.get();
523    a4 = ord.apply(x) & 0xff;
524    ret = (((long) a1) << 24) | (a2 << 16) | (a3 << 8) | a4;
525    if (a0 == 251) {
526      return ret;
527    }
528    x = src.get();
529    a5 = ord.apply(x) & 0xff;
530    if (a0 == 252) {
531      return (ret << 8) | a5;
532    }
533    x = src.get();
534    a6 = ord.apply(x) & 0xff;
535    if (a0 == 253) {
536      return (ret << 16) | (a5 << 8) | a6;
537    }
538    x = src.get();
539    a7 = ord.apply(x) & 0xff;
540    if (a0 == 254) {
541      return (ret << 24) | (a5 << 16) | (a6 << 8) | a7;
542    }
543    x = src.get();
544    a8 = ord.apply(x) & 0xff;
545    return (ret << 32) | (((long) a5) << 24) | (a6 << 16) | (a7 << 8) | a8;
546  }
547
548  /**
549   * Strip all trailing zeros to ensure that no digit will be zero and round using our default
550   * context to ensure precision doesn't exceed max allowed. From Phoenix's {@code NumberUtil}.
551   * @return new {@link BigDecimal} instance
552   */
553  static BigDecimal normalize(BigDecimal val) {
554    return null == val ? null : val.stripTrailingZeros().round(DEFAULT_MATH_CONTEXT);
555  }
556
557  /**
558   * Read significand digits from {@code src} according to the magnitude of {@code e}.
559   * @param src  The source from which to read encoded digits.
560   * @param e    The magnitude of the first digit read.
561   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
562   * @return The decoded value.
563   * @throws IllegalArgumentException when read exceeds the remaining length of {@code src}.
564   */
565  private static BigDecimal decodeSignificand(PositionedByteRange src, int e, boolean comp) {
566    // TODO: can this be made faster?
567    byte[] a = src.getBytes();
568    final int start = src.getPosition(), offset = src.getOffset(), remaining = src.getRemaining();
569    Order ord = comp ? DESCENDING : ASCENDING;
570    BigDecimal m;
571    StringBuilder sb = new StringBuilder();
572    for (int i = 0;; i++) {
573      if (i > remaining) {
574        // we've exceeded this range's window
575        src.setPosition(start);
576        throw new IllegalArgumentException(
577          "Read exceeds range before termination byte found. offset: " + offset + " position: "
578            + (start + i));
579      }
580      // one byte -> 2 digits
581      // base-100 digits are encoded as val * 2 + 1 except for the termination digit.
582      int twoDigits = (ord.apply(a[offset + start + i]) & 0xff) / 2;
583      sb.append(String.format("%02d", twoDigits));
584      // detect termination digit
585      // Besides, as we will normalise the return value at last,
586      // we only need to decode at most MAX_PRECISION + 2 digits here.
587      if ((ord.apply(a[offset + start + i]) & 1) == 0 || sb.length() > MAX_PRECISION + 1) {
588        src.setPosition(start + i + 1);
589        break;
590      }
591    }
592    m = new BigDecimal(sb.toString());
593    int stepsMoveLeft = sb.charAt(0) != '0' ? m.precision() : m.precision() + 1;
594    stepsMoveLeft -= e * 2;
595
596    return normalize(m.movePointLeft(stepsMoveLeft));
597  }
598
599  /**
600   * Skip {@code src} over the significand bytes.
601   * @param src  The source from which to read encoded digits.
602   * @param comp Treat encoded bytes as compliments when {@code comp} is true.
603   * @return the number of bytes skipped.
604   */
605  private static int skipSignificand(PositionedByteRange src, boolean comp) {
606    byte[] a = src.getBytes();
607    final int offset = src.getOffset(), start = src.getPosition();
608    int i = src.getPosition();
609    while (((comp ? DESCENDING : ASCENDING).apply(a[offset + i++]) & 1) != 0)
610      ;
611    src.setPosition(i);
612    return i - start;
613  }
614
615  /**
616   * <p>
617   * Encode the small magnitude floating point number {@code val} using the key encoding. The caller
618   * guarantees that 1.0 > abs(val) > 0.0.
619   * </p>
620   * <p>
621   * A floating point value is encoded as an integer exponent {@code E} and a mantissa {@code M}.
622   * The original value is equal to {@code (M * 100^E)}. {@code E} is set to the smallest value
623   * possible without making {@code M} greater than or equal to 1.0.
624   * </p>
625   * <p>
626   * For this routine, {@code E} will always be zero or negative, since the original value is less
627   * than one. The encoding written by this routine is the ones-complement of the varint of the
628   * negative of {@code E} followed by the mantissa:
629   *
630   * <pre>
631   *   Encoding:   ~-E  M
632   * </pre>
633   * </p>
634   * @param dst The destination to which encoded digits are written.
635   * @param val The value to encode.
636   * @return the number of bytes written.
637   */
638  private static int encodeNumericSmall(PositionedByteRange dst, BigDecimal val) {
639    // TODO: this can be done faster?
640    // assert 1.0 > abs(val) > 0.0
641    BigDecimal abs = val.abs();
642    assert BigDecimal.ZERO.compareTo(abs) < 0 && BigDecimal.ONE.compareTo(abs) > 0;
643    byte[] a = dst.getBytes();
644    boolean isNeg = val.signum() == -1;
645    final int offset = dst.getOffset(), start = dst.getPosition();
646
647    if (isNeg) { /* Small negative number: 0x14, -E, ~M */
648      dst.put(NEG_SMALL);
649    } else { /* Small positive number: 0x16, ~-E, M */
650      dst.put(POS_SMALL);
651    }
652
653    // normalize abs(val) to determine E
654    int zerosBeforeFirstNonZero = abs.scale() - abs.precision();
655    int lengthToMoveRight =
656      zerosBeforeFirstNonZero % 2 == 0 ? zerosBeforeFirstNonZero : zerosBeforeFirstNonZero - 1;
657    int e = lengthToMoveRight / 2;
658    abs = abs.movePointRight(lengthToMoveRight);
659
660    putVaruint64(dst, e, !isNeg); // encode appropriate E value.
661
662    // encode M by peeling off centimal digits, encoding x as 2x+1
663    int startM = dst.getPosition();
664    encodeToCentimal(dst, abs);
665    // terminal digit should be 2x
666    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
667    if (isNeg) {
668      // negative values encoded as ~M
669      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
670    }
671    return dst.getPosition() - start;
672  }
673
674  /**
675   * Encode the large magnitude floating point number {@code val} using the key encoding. The caller
676   * guarantees that {@code val} will be finite and abs(val) >= 1.0.
677   * <p>
678   * A floating point value is encoded as an integer exponent {@code E} and a mantissa {@code M}.
679   * The original value is equal to {@code (M * 100^E)}. {@code E} is set to the smallest value
680   * possible without making {@code M} greater than or equal to 1.0.
681   * </p>
682   * <p>
683   * Each centimal digit of the mantissa is stored in a byte. If the value of the centimal digit is
684   * {@code X} (hence {@code X>=0} and {@code X<=99}) then the byte value will be {@code 2*X+1} for
685   * every byte of the mantissa, except for the last byte which will be {@code 2*X+0}. The mantissa
686   * must be the minimum number of bytes necessary to represent the value; trailing {@code X==0}
687   * digits are omitted. This means that the mantissa will never contain a byte with the value
688   * {@code 0x00}.
689   * </p>
690   * <p>
691   * If {@code E > 10}, then this routine writes of {@code E} as a varint followed by the mantissa
692   * as described above. Otherwise, if {@code E <= 10}, this routine only writes the mantissa and
693   * leaves the {@code E} value to be encoded as part of the opening byte of the field by the
694   * calling function.
695   *
696   * <pre>
697   *   Encoding:  M       (if E&lt;=10)
698   *              E M     (if E&gt;10)
699   * </pre>
700   * </p>
701   * @param dst The destination to which encoded digits are written.
702   * @param val The value to encode.
703   * @return the number of bytes written.
704   */
705  private static int encodeNumericLarge(PositionedByteRange dst, BigDecimal val) {
706    // TODO: this can be done faster
707    BigDecimal abs = val.abs();
708    byte[] a = dst.getBytes();
709    boolean isNeg = val.signum() == -1;
710    final int start = dst.getPosition(), offset = dst.getOffset();
711
712    if (isNeg) { /* Large negative number: 0x08, ~E, ~M */
713      dst.put(NEG_LARGE);
714    } else { /* Large positive number: 0x22, E, M */
715      dst.put(POS_LARGE);
716    }
717
718    // normalize abs(val) to determine E
719    int integerDigits = abs.precision() - abs.scale();
720    int lengthToMoveLeft = integerDigits % 2 == 0 ? integerDigits : integerDigits + 1;
721    int e = lengthToMoveLeft / 2;
722    abs = abs.movePointLeft(lengthToMoveLeft);
723
724    // encode appropriate header byte and/or E value.
725    if (e > 10) { /* large number, write out {~,}E */
726      putVaruint64(dst, e, isNeg);
727    } else {
728      if (isNeg) { /* Medium negative number: 0x13-E, ~M */
729        dst.put(start, (byte) (NEG_MED_MAX - e));
730      } else { /* Medium positive number: 0x17+E, M */
731        dst.put(start, (byte) (POS_MED_MIN + e));
732      }
733    }
734
735    // encode M by peeling off centimal digits, encoding x as 2x+1
736    int startM = dst.getPosition();
737    encodeToCentimal(dst, abs);
738    // terminal digit should be 2x
739    a[offset + dst.getPosition() - 1] = (byte) (a[offset + dst.getPosition() - 1] & 0xfe);
740    if (isNeg) {
741      // negative values encoded as ~M
742      DESCENDING.apply(a, offset + startM, dst.getPosition() - startM);
743    }
744    return dst.getPosition() - start;
745  }
746
747  /**
748   * Encode a value val in [0.01, 1.0) into Centimals. Util function for
749   * {@link OrderedBytes#encodeNumericLarge(PositionedByteRange, BigDecimal)} and
750   * {@link OrderedBytes#encodeNumericSmall(PositionedByteRange, BigDecimal)}
751   * @param dst The destination to which encoded digits are written.
752   * @param val A BigDecimal after the normalization. The value must be in [0.01, 1.0).
753   */
754  private static void encodeToCentimal(PositionedByteRange dst, BigDecimal val) {
755    // The input value val must be in [0.01, 1.0)
756    String stringOfAbs = val.stripTrailingZeros().toPlainString();
757    String value = stringOfAbs.substring(stringOfAbs.indexOf('.') + 1);
758    int d;
759
760    // If the first float digit is 0, we will encode one digit more than MAX_PRECISION
761    // We encode at most MAX_PRECISION significant digits into centimals,
762    // because the input value, has been already normalized.
763    int maxPrecision = value.charAt(0) == '0' ? MAX_PRECISION + 1 : MAX_PRECISION;
764    maxPrecision = Math.min(maxPrecision, value.length());
765    for (int i = 0; i < maxPrecision; i += 2) {
766      d = (value.charAt(i) - '0') * 10;
767      if (i + 1 < maxPrecision) {
768        d += (value.charAt(i + 1) - '0');
769      }
770      dst.put((byte) (2 * d + 1));
771    }
772  }
773
774  /**
775   * Encode a numerical value using the variable-length encoding.
776   * @param dst The destination to which encoded digits are written.
777   * @param val The value to encode.
778   * @param ord The {@link Order} to respect while encoding {@code val}.
779   * @return the number of bytes written.
780   */
781  public static int encodeNumeric(PositionedByteRange dst, long val, Order ord) {
782    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
783  }
784
785  /**
786   * Encode a numerical value using the variable-length encoding.
787   * @param dst The destination to which encoded digits are written.
788   * @param val The value to encode.
789   * @param ord The {@link Order} to respect while encoding {@code val}.
790   * @return the number of bytes written.
791   */
792  public static int encodeNumeric(PositionedByteRange dst, double val, Order ord) {
793    if (val == 0.0) {
794      dst.put(ord.apply(ZERO));
795      return 1;
796    }
797    if (Double.isNaN(val)) {
798      dst.put(ord.apply(NAN));
799      return 1;
800    }
801    if (val == Double.NEGATIVE_INFINITY) {
802      dst.put(ord.apply(NEG_INF));
803      return 1;
804    }
805    if (val == Double.POSITIVE_INFINITY) {
806      dst.put(ord.apply(POS_INF));
807      return 1;
808    }
809    return encodeNumeric(dst, BigDecimal.valueOf(val), ord);
810  }
811
812  /**
813   * Encode a numerical value using the variable-length encoding. If the number of significant
814   * digits of the value exceeds the {@link OrderedBytes#MAX_PRECISION}, the exceeding part will be
815   * lost.
816   * @param dst The destination to which encoded digits are written.
817   * @param val The value to encode.
818   * @param ord The {@link Order} to respect while encoding {@code val}.
819   * @return the number of bytes written.
820   */
821  public static int encodeNumeric(PositionedByteRange dst, BigDecimal val, Order ord) {
822    final int len, offset = dst.getOffset(), start = dst.getPosition();
823    if (null == val) {
824      return encodeNull(dst, ord);
825    } else if (BigDecimal.ZERO.compareTo(val) == 0) {
826      dst.put(ord.apply(ZERO));
827      return 1;
828    }
829    BigDecimal abs = val.abs();
830    if (BigDecimal.ONE.compareTo(abs) <= 0) { // abs(v) >= 1.0
831      len = encodeNumericLarge(dst, normalize(val));
832    } else { // 1.0 > abs(v) >= 0.0
833      len = encodeNumericSmall(dst, normalize(val));
834    }
835    ord.apply(dst.getBytes(), offset + start, len);
836    return len;
837  }
838
839  /**
840   * Decode a {@link BigDecimal} from {@code src}. Assumes {@code src} encodes a value in Numeric
841   * encoding and is within the valid range of {@link BigDecimal} values. {@link BigDecimal} does
842   * not support {@code NaN} or {@code Infinte} values.
843   * @see #decodeNumericAsDouble(PositionedByteRange)
844   */
845  private static BigDecimal decodeNumericValue(PositionedByteRange src) {
846    final int e;
847    byte header = src.get();
848    boolean dsc = -1 == Integer.signum(header);
849    header = dsc ? DESCENDING.apply(header) : header;
850
851    if (header == NULL) return null;
852    if (header == NEG_LARGE) { /* Large negative number: 0x08, ~E, ~M */
853      e = (int) getVaruint64(src, !dsc);
854      return decodeSignificand(src, e, !dsc).negate();
855    }
856    if (header >= NEG_MED_MIN && header <= NEG_MED_MAX) {
857      /* Medium negative number: 0x13-E, ~M */
858      e = NEG_MED_MAX - header;
859      return decodeSignificand(src, e, !dsc).negate();
860    }
861    if (header == NEG_SMALL) { /* Small negative number: 0x14, -E, ~M */
862      e = (int) -getVaruint64(src, dsc);
863      return decodeSignificand(src, e, !dsc).negate();
864    }
865    if (header == ZERO) {
866      return BigDecimal.ZERO;
867    }
868    if (header == POS_SMALL) { /* Small positive number: 0x16, ~-E, M */
869      e = (int) -getVaruint64(src, !dsc);
870      return decodeSignificand(src, e, dsc);
871    }
872    if (header >= POS_MED_MIN && header <= POS_MED_MAX) {
873      /* Medium positive number: 0x17+E, M */
874      e = header - POS_MED_MIN;
875      return decodeSignificand(src, e, dsc);
876    }
877    if (header == POS_LARGE) { /* Large positive number: 0x22, E, M */
878      e = (int) getVaruint64(src, dsc);
879      return decodeSignificand(src, e, dsc);
880    }
881    throw unexpectedHeader(header);
882  }
883
884  /**
885   * Decode a primitive {@code double} value from the Numeric encoding. Numeric encoding is based on
886   * {@link BigDecimal}; in the event the encoded value is larger than can be represented in a
887   * {@code double}, this method performs an implicit narrowing conversion as described in
888   * {@link BigDecimal#doubleValue()}.
889   * @throws NullPointerException     when the encoded value is {@code NULL}.
890   * @throws IllegalArgumentException when the encoded value is not a Numeric.
891   * @see #encodeNumeric(PositionedByteRange, double, Order)
892   * @see BigDecimal#doubleValue()
893   */
894  public static double decodeNumericAsDouble(PositionedByteRange src) {
895    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
896    if (isNull(src)) {
897      throw new NullPointerException("A null value cannot be decoded to a double.");
898    }
899    if (isNumericNaN(src)) {
900      src.get();
901      return Double.NaN;
902    }
903    if (isNumericZero(src)) {
904      src.get();
905      return Double.valueOf(0.0);
906    }
907
908    byte header = -1 == Integer.signum(src.peek()) ? DESCENDING.apply(src.peek()) : src.peek();
909
910    if (header == NEG_INF) {
911      src.get();
912      return Double.NEGATIVE_INFINITY;
913    } else if (header == POS_INF) {
914      src.get();
915      return Double.POSITIVE_INFINITY;
916    } else {
917      return decodeNumericValue(src).doubleValue();
918    }
919  }
920
921  /**
922   * Decode a primitive {@code long} value from the Numeric encoding. Numeric encoding is based on
923   * {@link BigDecimal}; in the event the encoded value is larger than can be represented in a
924   * {@code long}, this method performs an implicit narrowing conversion as described in
925   * {@link BigDecimal#doubleValue()}.
926   * @throws NullPointerException     when the encoded value is {@code NULL}.
927   * @throws IllegalArgumentException when the encoded value is not a Numeric.
928   * @see #encodeNumeric(PositionedByteRange, long, Order)
929   * @see BigDecimal#longValue()
930   */
931  public static long decodeNumericAsLong(PositionedByteRange src) {
932    // TODO: should an encoded NULL value throw unexpectedHeader() instead?
933    if (isNull(src)) throw new NullPointerException();
934    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
935    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
936    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
937
938    if (isNumericZero(src)) {
939      src.get();
940      return Long.valueOf(0);
941    }
942    return decodeNumericValue(src).longValue();
943  }
944
945  /**
946   * Decode a {@link BigDecimal} value from the variable-length encoding.
947   * @throws IllegalArgumentException when the encoded value is not a Numeric.
948   * @see #encodeNumeric(PositionedByteRange, BigDecimal, Order)
949   */
950  public static BigDecimal decodeNumericAsBigDecimal(PositionedByteRange src) {
951    if (isNull(src)) {
952      src.get();
953      return null;
954    }
955    if (!isNumeric(src)) throw unexpectedHeader(src.peek());
956    if (isNumericNaN(src)) throw unexpectedHeader(src.peek());
957    if (isNumericInfinite(src)) throw unexpectedHeader(src.peek());
958    return decodeNumericValue(src);
959  }
960
961  /**
962   * Encode a String value. String encoding is 0x00-terminated and so it does not support
963   * {@code \u0000} codepoints in the value.
964   * @param dst The destination to which the encoded value is written.
965   * @param val The value to encode.
966   * @param ord The {@link Order} to respect while encoding {@code val}.
967   * @return the number of bytes written.
968   * @throws IllegalArgumentException when {@code val} contains a {@code \u0000}.
969   */
970  public static int encodeString(PositionedByteRange dst, String val, Order ord) {
971    if (null == val) {
972      return encodeNull(dst, ord);
973    }
974    if (val.contains("\u0000"))
975      throw new IllegalArgumentException("Cannot encode String values containing '\\u0000'");
976    final int offset = dst.getOffset(), start = dst.getPosition();
977    dst.put(TEXT);
978    // TODO: is there no way to decode into dst directly?
979    dst.put(val.getBytes(UTF8));
980    dst.put(TERM);
981    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
982    return dst.getPosition() - start;
983  }
984
985  /**
986   * Decode a String value.
987   */
988  public static String decodeString(PositionedByteRange src) {
989    final byte header = src.get();
990    if (header == NULL || header == DESCENDING.apply(NULL)) return null;
991    assert header == TEXT || header == DESCENDING.apply(TEXT);
992    Order ord = header == TEXT ? ASCENDING : DESCENDING;
993    byte[] a = src.getBytes();
994    final int offset = src.getOffset(), start = src.getPosition();
995    final byte terminator = ord.apply(TERM);
996    int rawStartPos = offset + start, rawTermPos = rawStartPos;
997    for (; a[rawTermPos] != terminator; rawTermPos++)
998      ;
999    src.setPosition(rawTermPos - offset + 1); // advance position to TERM + 1
1000    if (DESCENDING == ord) {
1001      // make a copy so that we don't disturb encoded value with ord.
1002      byte[] copy = new byte[rawTermPos - rawStartPos];
1003      System.arraycopy(a, rawStartPos, copy, 0, copy.length);
1004      ord.apply(copy);
1005      return new String(copy, UTF8);
1006    } else {
1007      return new String(a, rawStartPos, rawTermPos - rawStartPos, UTF8);
1008    }
1009  }
1010
1011  /**
1012   * Calculate the expected BlobVar encoded length based on unencoded length.
1013   */
1014  public static int blobVarEncodedLength(int len) {
1015    if (0 == len) return 2; // 1-byte header + 1-byte terminator
1016    else return (int) Math.ceil((len * 8) // 8-bits per input byte
1017      / 7.0) // 7-bits of input data per encoded byte, rounded up
1018      + 1; // + 1-byte header
1019  }
1020
1021  /**
1022   * Calculate the expected BlobVar decoded length based on encoded length.
1023   */
1024  static int blobVarDecodedLength(int len) {
1025    return ((len - 1) // 1-byte header
1026      * 7) // 7-bits of payload per encoded byte
1027      / 8; // 8-bits per byte
1028  }
1029
1030  /**
1031   * Encode a Blob value using a modified varint encoding scheme.
1032   * <p>
1033   * This format encodes a byte[] value such that no limitations on the input value are imposed. The
1034   * first byte encodes the encoding scheme that follows, {@link #BLOB_VAR}. Each encoded byte
1035   * thereafter consists of a header bit followed by 7 bits of payload. A header bit of '1'
1036   * indicates continuation of the encoding. A header bit of '0' indicates this byte contains the
1037   * last of the payload. An empty input value is encoded as the header byte immediately followed by
1038   * a termination byte {@code 0x00}. This is not ambiguous with the encoded value of {@code []},
1039   * which results in {@code [0x80, 0x00]}.
1040   * </p>
1041   * @return the number of bytes written.
1042   */
1043  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, int voff, int vlen,
1044    Order ord) {
1045    if (null == val) {
1046      return encodeNull(dst, ord);
1047    }
1048    // Empty value is null-terminated. All other values are encoded as 7-bits per byte.
1049    assert dst.getRemaining() >= blobVarEncodedLength(vlen) : "buffer overflow expected.";
1050    final int offset = dst.getOffset(), start = dst.getPosition();
1051    dst.put(BLOB_VAR);
1052    if (0 == vlen) {
1053      dst.put(TERM);
1054    } else {
1055      byte s = 1, t = 0;
1056      for (int i = voff; i < vlen; i++) {
1057        dst.put((byte) (0x80 | t | ((val[i] & 0xff) >>> s)));
1058        if (s < 7) {
1059          t = (byte) (val[i] << (7 - s));
1060          s++;
1061        } else {
1062          dst.put((byte) (0x80 | val[i]));
1063          s = 1;
1064          t = 0;
1065        }
1066      }
1067      if (s > 1) {
1068        dst.put((byte) (0x7f & t));
1069      } else {
1070        dst.getBytes()[offset + dst.getPosition() - 1] =
1071          (byte) (dst.getBytes()[offset + dst.getPosition() - 1] & 0x7f);
1072      }
1073    }
1074    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1075    return dst.getPosition() - start;
1076  }
1077
1078  /**
1079   * Encode a blob value using a modified varint encoding scheme.
1080   * @return the number of bytes written.
1081   * @see #encodeBlobVar(PositionedByteRange, byte[], int, int, Order)
1082   */
1083  public static int encodeBlobVar(PositionedByteRange dst, byte[] val, Order ord) {
1084    return encodeBlobVar(dst, val, 0, null != val ? val.length : 0, ord);
1085  }
1086
1087  /**
1088   * Decode a blob value that was encoded using BlobVar encoding.
1089   */
1090  public static byte[] decodeBlobVar(PositionedByteRange src) {
1091    final byte header = src.get();
1092    if (header == NULL || header == DESCENDING.apply(NULL)) {
1093      return null;
1094    }
1095    assert header == BLOB_VAR || header == DESCENDING.apply(BLOB_VAR);
1096    Order ord = BLOB_VAR == header ? ASCENDING : DESCENDING;
1097    if (src.peek() == ord.apply(TERM)) {
1098      // skip empty input buffer.
1099      src.get();
1100      return new byte[0];
1101    }
1102    final int offset = src.getOffset(), start = src.getPosition();
1103    int end;
1104    byte[] a = src.getBytes();
1105    for (end = start; (byte) (ord.apply(a[offset + end]) & 0x80) != TERM; end++)
1106      ;
1107    end++; // increment end to 1-past last byte
1108    // create ret buffer using length of encoded data + 1 (header byte)
1109    PositionedByteRange ret =
1110      new SimplePositionedMutableByteRange(blobVarDecodedLength(end - start + 1));
1111    int s = 6;
1112    byte t = (byte) ((ord.apply(a[offset + start]) << 1) & 0xff);
1113    for (int i = start + 1; i < end; i++) {
1114      if (s == 7) {
1115        ret.put((byte) (t | (ord.apply(a[offset + i]) & 0x7f)));
1116        i++;
1117        // explicitly reset t -- clean up overflow buffer after decoding
1118        // a full cycle and retain assertion condition below. This happens
1119        t = 0; // when the LSB in the last encoded byte is 1. (HBASE-9893)
1120      } else {
1121        ret.put((byte) (t | ((ord.apply(a[offset + i]) & 0x7f) >>> s)));
1122      }
1123      if (i == end) break;
1124      t = (byte) ((ord.apply(a[offset + i]) << (8 - s)) & 0xff);
1125      s = s == 1 ? 7 : s - 1;
1126    }
1127    src.setPosition(end);
1128    assert t == 0 : "Unexpected bits remaining after decoding blob.";
1129    assert ret.getPosition() == ret.getLength() : "Allocated unnecessarily large return buffer.";
1130    return ret.getBytes();
1131  }
1132
1133  /**
1134   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in DESCENDING order is NULL
1135   * terminated so as to preserve proper sorting of {@code []} and so it does not support
1136   * {@code 0x00} in the value.
1137   * @return the number of bytes written.
1138   * @throws IllegalArgumentException when {@code ord} is DESCENDING and {@code val} contains a
1139   *                                  {@code 0x00} byte.
1140   */
1141  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, int voff, int vlen,
1142    Order ord) {
1143    if (null == val) {
1144      encodeNull(dst, ord);
1145      if (ASCENDING == ord) return 1;
1146      else {
1147        // DESCENDING ordered BlobCopy requires a termination bit to preserve
1148        // sort-order semantics of null values.
1149        dst.put(ord.apply(TERM));
1150        return 2;
1151      }
1152    }
1153    // Blobs as final entry in a compound key are written unencoded.
1154    assert dst.getRemaining() >= vlen + (ASCENDING == ord ? 1 : 2);
1155    if (DESCENDING == ord) {
1156      for (int i = 0; i < vlen; i++) {
1157        if (TERM == val[voff + i]) {
1158          throw new IllegalArgumentException("0x00 bytes not permitted in value.");
1159        }
1160      }
1161    }
1162    final int offset = dst.getOffset(), start = dst.getPosition();
1163    dst.put(BLOB_COPY);
1164    dst.put(val, voff, vlen);
1165    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1166    // sort-order semantics of null values.
1167    if (DESCENDING == ord) dst.put(TERM);
1168    ord.apply(dst.getBytes(), offset + start, dst.getPosition() - start);
1169    return dst.getPosition() - start;
1170  }
1171
1172  /**
1173   * Encode a Blob value as a byte-for-byte copy. BlobCopy encoding in DESCENDING order is NULL
1174   * terminated so as to preserve proper sorting of {@code []} and so it does not support
1175   * {@code 0x00} in the value.
1176   * @return the number of bytes written.
1177   * @throws IllegalArgumentException when {@code ord} is DESCENDING and {@code val} contains a
1178   *                                  {@code 0x00} byte.
1179   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1180   */
1181  public static int encodeBlobCopy(PositionedByteRange dst, byte[] val, Order ord) {
1182    return encodeBlobCopy(dst, val, 0, null != val ? val.length : 0, ord);
1183  }
1184
1185  /**
1186   * Decode a Blob value, byte-for-byte copy.
1187   * @see #encodeBlobCopy(PositionedByteRange, byte[], int, int, Order)
1188   */
1189  public static byte[] decodeBlobCopy(PositionedByteRange src) {
1190    byte header = src.get();
1191    if (header == NULL || header == DESCENDING.apply(NULL)) {
1192      return null;
1193    }
1194    assert header == BLOB_COPY || header == DESCENDING.apply(BLOB_COPY);
1195    Order ord = header == BLOB_COPY ? ASCENDING : DESCENDING;
1196    final int length = src.getRemaining() - (ASCENDING == ord ? 0 : 1);
1197    byte[] ret = new byte[length];
1198    src.get(ret);
1199    ord.apply(ret, 0, ret.length);
1200    // DESCENDING ordered BlobCopy requires a termination bit to preserve
1201    // sort-order semantics of null values.
1202    if (DESCENDING == ord) src.get();
1203    return ret;
1204  }
1205
1206  /**
1207   * Encode a null value.
1208   * @param dst The destination to which encoded digits are written.
1209   * @param ord The {@link Order} to respect while encoding {@code val}.
1210   * @return the number of bytes written.
1211   */
1212  public static int encodeNull(PositionedByteRange dst, Order ord) {
1213    dst.put(ord.apply(NULL));
1214    return 1;
1215  }
1216
1217  /**
1218   * Encode an {@code int8} value using the fixed-length encoding.
1219   * @return the number of bytes written.
1220   * @see #encodeInt64(PositionedByteRange, long, Order)
1221   * @see #decodeInt8(PositionedByteRange)
1222   */
1223  public static int encodeInt8(PositionedByteRange dst, byte val, Order ord) {
1224    final int offset = dst.getOffset(), start = dst.getPosition();
1225    dst.put(FIXED_INT8).put((byte) (val ^ 0x80));
1226    ord.apply(dst.getBytes(), offset + start, 2);
1227    return 2;
1228  }
1229
1230  /**
1231   * Decode an {@code int8} value.
1232   * @see #encodeInt8(PositionedByteRange, byte, Order)
1233   */
1234  public static byte decodeInt8(PositionedByteRange src) {
1235    final byte header = src.get();
1236    assert header == FIXED_INT8 || header == DESCENDING.apply(FIXED_INT8);
1237    Order ord = header == FIXED_INT8 ? ASCENDING : DESCENDING;
1238    return (byte) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1239  }
1240
1241  /**
1242   * Encode an {@code int16} value using the fixed-length encoding.
1243   * @return the number of bytes written.
1244   * @see #encodeInt64(PositionedByteRange, long, Order)
1245   * @see #decodeInt16(PositionedByteRange)
1246   */
1247  public static int encodeInt16(PositionedByteRange dst, short val, Order ord) {
1248    final int offset = dst.getOffset(), start = dst.getPosition();
1249    dst.put(FIXED_INT16).put((byte) ((val >> 8) ^ 0x80)).put((byte) val);
1250    ord.apply(dst.getBytes(), offset + start, 3);
1251    return 3;
1252  }
1253
1254  /**
1255   * Decode an {@code int16} value.
1256   * @see #encodeInt16(PositionedByteRange, short, Order)
1257   */
1258  public static short decodeInt16(PositionedByteRange src) {
1259    final byte header = src.get();
1260    assert header == FIXED_INT16 || header == DESCENDING.apply(FIXED_INT16);
1261    Order ord = header == FIXED_INT16 ? ASCENDING : DESCENDING;
1262    short val = (short) ((ord.apply(src.get()) ^ 0x80) & 0xff);
1263    val = (short) ((val << 8) + (ord.apply(src.get()) & 0xff));
1264    return val;
1265  }
1266
1267  /**
1268   * Encode an {@code int32} value using the fixed-length encoding.
1269   * @return the number of bytes written.
1270   * @see #encodeInt64(PositionedByteRange, long, Order)
1271   * @see #decodeInt32(PositionedByteRange)
1272   */
1273  public static int encodeInt32(PositionedByteRange dst, int val, Order ord) {
1274    final int offset = dst.getOffset(), start = dst.getPosition();
1275    dst.put(FIXED_INT32).put((byte) ((val >> 24) ^ 0x80)).put((byte) (val >> 16))
1276      .put((byte) (val >> 8)).put((byte) val);
1277    ord.apply(dst.getBytes(), offset + start, 5);
1278    return 5;
1279  }
1280
1281  /**
1282   * Decode an {@code int32} value.
1283   * @see #encodeInt32(PositionedByteRange, int, Order)
1284   */
1285  public static int decodeInt32(PositionedByteRange src) {
1286    final byte header = src.get();
1287    assert header == FIXED_INT32 || header == DESCENDING.apply(FIXED_INT32);
1288    Order ord = header == FIXED_INT32 ? ASCENDING : DESCENDING;
1289    int val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1290    for (int i = 1; i < 4; i++) {
1291      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1292    }
1293    return val;
1294  }
1295
1296  /**
1297   * Encode an {@code int64} value using the fixed-length encoding.
1298   * <p>
1299   * This format ensures that all longs sort in their natural order, as they would sort when using
1300   * signed long comparison.
1301   * </p>
1302   * <p>
1303   * All Longs are serialized to an 8-byte, fixed-width sortable byte format. Serialization is
1304   * performed by inverting the integer sign bit and writing the resulting bytes to the byte array
1305   * in big endian order. The encoded value is prefixed by the {@link #FIXED_INT64} header byte.
1306   * This encoding is designed to handle java language primitives and so Null values are NOT
1307   * supported by this implementation.
1308   * </p>
1309   * <p>
1310   * For example:
1311   * </p>
1312   *
1313   * <pre>
1314   * Input:   0x0000000000000005 (5)
1315   * Result:  0x288000000000000005
1316   *
1317   * Input:   0xfffffffffffffffb (-4)
1318   * Result:  0x280000000000000004
1319   *
1320   * Input:   0x7fffffffffffffff (Long.MAX_VALUE)
1321   * Result:  0x28ffffffffffffffff
1322   *
1323   * Input:   0x8000000000000000 (Long.MIN_VALUE)
1324   * Result:  0x287fffffffffffffff
1325   * </pre>
1326   * <p>
1327   * This encoding format, and much of this documentation string, is based on Orderly's
1328   * {@code FixedIntWritableRowKey}.
1329   * </p>
1330   * @return the number of bytes written.
1331   * @see #decodeInt64(PositionedByteRange)
1332   */
1333  public static int encodeInt64(PositionedByteRange dst, long val, Order ord) {
1334    final int offset = dst.getOffset(), start = dst.getPosition();
1335    dst.put(FIXED_INT64).put((byte) ((val >> 56) ^ 0x80)).put((byte) (val >> 48))
1336      .put((byte) (val >> 40)).put((byte) (val >> 32)).put((byte) (val >> 24))
1337      .put((byte) (val >> 16)).put((byte) (val >> 8)).put((byte) val);
1338    ord.apply(dst.getBytes(), offset + start, 9);
1339    return 9;
1340  }
1341
1342  /**
1343   * Decode an {@code int64} value.
1344   * @see #encodeInt64(PositionedByteRange, long, Order)
1345   */
1346  public static long decodeInt64(PositionedByteRange src) {
1347    final byte header = src.get();
1348    assert header == FIXED_INT64 || header == DESCENDING.apply(FIXED_INT64);
1349    Order ord = header == FIXED_INT64 ? ASCENDING : DESCENDING;
1350    long val = (ord.apply(src.get()) ^ 0x80) & 0xff;
1351    for (int i = 1; i < 8; i++) {
1352      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1353    }
1354    return val;
1355  }
1356
1357  /**
1358   * Encode a 32-bit floating point value using the fixed-length encoding. Encoding format is
1359   * described at length in {@link #encodeFloat64(PositionedByteRange, double, Order)}.
1360   * @return the number of bytes written.
1361   * @see #decodeFloat32(PositionedByteRange)
1362   * @see #encodeFloat64(PositionedByteRange, double, Order)
1363   */
1364  public static int encodeFloat32(PositionedByteRange dst, float val, Order ord) {
1365    final int offset = dst.getOffset(), start = dst.getPosition();
1366    int i = Float.floatToIntBits(val);
1367    i ^= ((i >> (Integer.SIZE - 1)) | Integer.MIN_VALUE);
1368    dst.put(FIXED_FLOAT32).put((byte) (i >> 24)).put((byte) (i >> 16)).put((byte) (i >> 8))
1369      .put((byte) i);
1370    ord.apply(dst.getBytes(), offset + start, 5);
1371    return 5;
1372  }
1373
1374  /**
1375   * Decode a 32-bit floating point value using the fixed-length encoding.
1376   * @see #encodeFloat32(PositionedByteRange, float, Order)
1377   */
1378  public static float decodeFloat32(PositionedByteRange src) {
1379    final byte header = src.get();
1380    assert header == FIXED_FLOAT32 || header == DESCENDING.apply(FIXED_FLOAT32);
1381    Order ord = header == FIXED_FLOAT32 ? ASCENDING : DESCENDING;
1382    int val = ord.apply(src.get()) & 0xff;
1383    for (int i = 1; i < 4; i++) {
1384      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1385    }
1386    val ^= (~val >> (Integer.SIZE - 1)) | Integer.MIN_VALUE;
1387    return Float.intBitsToFloat(val);
1388  }
1389
1390  /**
1391   * Encode a 64-bit floating point value using the fixed-length encoding.
1392   * <p>
1393   * This format ensures the following total ordering of floating point values:
1394   * Double.NEGATIVE_INFINITY &lt; -Double.MAX_VALUE &lt; ... &lt; -Double.MIN_VALUE &lt; -0.0 &lt;
1395   * +0.0; &lt; Double.MIN_VALUE &lt; ... &lt; Double.MAX_VALUE &lt; Double.POSITIVE_INFINITY &lt;
1396   * Double.NaN
1397   * </p>
1398   * <p>
1399   * Floating point numbers are encoded as specified in IEEE 754. A 64-bit double precision float
1400   * consists of a sign bit, 11-bit unsigned exponent encoded in offset-1023 notation, and a 52-bit
1401   * significand. The format is described further in the
1402   * <a href="http://en.wikipedia.org/wiki/Double_precision"> Double Precision Floating Point
1403   * Wikipedia page</a>
1404   * </p>
1405   * <p>
1406   * The value of a normal float is -1 <sup>sign bit</sup> &times; 2<sup>exponent - 1023</sup>
1407   * &times; 1.significand
1408   * </p>
1409   * <p>
1410   * The IEE754 floating point format already preserves sort ordering for positive floating point
1411   * numbers when the raw bytes are compared in most significant byte order. This is discussed
1412   * further at
1413   * <a href= "http://www.cygnus-software.com/papers/comparingfloats/comparingfloats.htm" >
1414   * http://www.cygnus-software.com/papers/comparingfloats/comparingfloats. htm</a>
1415   * </p>
1416   * <p>
1417   * Thus, we need only ensure that negative numbers sort in the the exact opposite order as
1418   * positive numbers (so that say, negative infinity is less than negative 1), and that all
1419   * negative numbers compare less than any positive number. To accomplish this, we invert the sign
1420   * bit of all floating point numbers, and we also invert the exponent and significand bits if the
1421   * floating point number was negative.
1422   * </p>
1423   * <p>
1424   * More specifically, we first store the floating point bits into a 64-bit long {@code l} using
1425   * {@link Double#doubleToLongBits}. This method collapses all NaNs into a single, canonical NaN
1426   * value but otherwise leaves the bits unchanged. We then compute
1427   * </p>
1428   *
1429   * <pre>
1430   * l &circ;= (l &gt;&gt; (Long.SIZE - 1)) | Long.MIN_SIZE
1431   * </pre>
1432   * <p>
1433   * which inverts the sign bit and XOR's all other bits with the sign bit itself. Comparing the raw
1434   * bytes of {@code l} in most significant byte order is equivalent to performing a double
1435   * precision floating point comparison on the underlying bits (ignoring NaN comparisons, as NaNs
1436   * don't compare equal to anything when performing floating point comparisons).
1437   * </p>
1438   * <p>
1439   * The resulting long integer is then converted into a byte array by serializing the long one byte
1440   * at a time in most significant byte order. The serialized integer is prefixed by a single header
1441   * byte. All serialized values are 9 bytes in length.
1442   * </p>
1443   * <p>
1444   * This encoding format, and much of this highly detailed documentation string, is based on
1445   * Orderly's {@code DoubleWritableRowKey}.
1446   * </p>
1447   * @return the number of bytes written.
1448   * @see #decodeFloat64(PositionedByteRange)
1449   */
1450  public static int encodeFloat64(PositionedByteRange dst, double val, Order ord) {
1451    final int offset = dst.getOffset(), start = dst.getPosition();
1452    long lng = Double.doubleToLongBits(val);
1453    lng ^= ((lng >> (Long.SIZE - 1)) | Long.MIN_VALUE);
1454    dst.put(FIXED_FLOAT64).put((byte) (lng >> 56)).put((byte) (lng >> 48)).put((byte) (lng >> 40))
1455      .put((byte) (lng >> 32)).put((byte) (lng >> 24)).put((byte) (lng >> 16))
1456      .put((byte) (lng >> 8)).put((byte) lng);
1457    ord.apply(dst.getBytes(), offset + start, 9);
1458    return 9;
1459  }
1460
1461  /**
1462   * Decode a 64-bit floating point value using the fixed-length encoding.
1463   * @see #encodeFloat64(PositionedByteRange, double, Order)
1464   */
1465  public static double decodeFloat64(PositionedByteRange src) {
1466    final byte header = src.get();
1467    assert header == FIXED_FLOAT64 || header == DESCENDING.apply(FIXED_FLOAT64);
1468    Order ord = header == FIXED_FLOAT64 ? ASCENDING : DESCENDING;
1469    long val = ord.apply(src.get()) & 0xff;
1470    for (int i = 1; i < 8; i++) {
1471      val = (val << 8) + (ord.apply(src.get()) & 0xff);
1472    }
1473    val ^= (~val >> (Long.SIZE - 1)) | Long.MIN_VALUE;
1474    return Double.longBitsToDouble(val);
1475  }
1476
1477  /**
1478   * Returns true when {@code src} appears to be positioned an encoded value, false otherwise.
1479   */
1480  public static boolean isEncodedValue(PositionedByteRange src) {
1481    return isNull(src) || isNumeric(src) || isFixedInt8(src) || isFixedInt16(src)
1482      || isFixedInt32(src) || isFixedInt64(src) || isFixedFloat32(src) || isFixedFloat64(src)
1483      || isText(src) || isBlobCopy(src) || isBlobVar(src);
1484  }
1485
1486  /**
1487   * Return true when the next encoded value in {@code src} is null, false otherwise.
1488   */
1489  public static boolean isNull(PositionedByteRange src) {
1490    return NULL == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1491  }
1492
1493  /**
1494   * Return true when the next encoded value in {@code src} uses Numeric encoding, false otherwise.
1495   * {@code NaN}, {@code +/-Inf} are valid Numeric values.
1496   */
1497  public static boolean isNumeric(PositionedByteRange src) {
1498    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1499    return x >= NEG_INF && x <= NAN;
1500  }
1501
1502  /**
1503   * Return true when the next encoded value in {@code src} uses Numeric encoding and is
1504   * {@code Infinite}, false otherwise.
1505   */
1506  public static boolean isNumericInfinite(PositionedByteRange src) {
1507    byte x = (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1508    return NEG_INF == x || POS_INF == x;
1509  }
1510
1511  /**
1512   * Return true when the next encoded value in {@code src} uses Numeric encoding and is
1513   * {@code NaN}, false otherwise.
1514   */
1515  public static boolean isNumericNaN(PositionedByteRange src) {
1516    return NAN == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1517  }
1518
1519  /**
1520   * Return true when the next encoded value in {@code src} uses Numeric encoding and is {@code 0},
1521   * false otherwise.
1522   */
1523  public static boolean isNumericZero(PositionedByteRange src) {
1524    return ZERO == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1525  }
1526
1527  /**
1528   * Return true when the next encoded value in {@code src} uses fixed-width Int8 encoding, false
1529   * otherwise.
1530   */
1531  public static boolean isFixedInt8(PositionedByteRange src) {
1532    return FIXED_INT8
1533        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1534  }
1535
1536  /**
1537   * Return true when the next encoded value in {@code src} uses fixed-width Int16 encoding, false
1538   * otherwise.
1539   */
1540  public static boolean isFixedInt16(PositionedByteRange src) {
1541    return FIXED_INT16
1542        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1543  }
1544
1545  /**
1546   * Return true when the next encoded value in {@code src} uses fixed-width Int32 encoding, false
1547   * otherwise.
1548   */
1549  public static boolean isFixedInt32(PositionedByteRange src) {
1550    return FIXED_INT32
1551        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1552  }
1553
1554  /**
1555   * Return true when the next encoded value in {@code src} uses fixed-width Int64 encoding, false
1556   * otherwise.
1557   */
1558  public static boolean isFixedInt64(PositionedByteRange src) {
1559    return FIXED_INT64
1560        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1561  }
1562
1563  /**
1564   * Return true when the next encoded value in {@code src} uses fixed-width Float32 encoding, false
1565   * otherwise.
1566   */
1567  public static boolean isFixedFloat32(PositionedByteRange src) {
1568    return FIXED_FLOAT32
1569        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1570  }
1571
1572  /**
1573   * Return true when the next encoded value in {@code src} uses fixed-width Float64 encoding, false
1574   * otherwise.
1575   */
1576  public static boolean isFixedFloat64(PositionedByteRange src) {
1577    return FIXED_FLOAT64
1578        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1579  }
1580
1581  /**
1582   * Return true when the next encoded value in {@code src} uses Text encoding, false otherwise.
1583   */
1584  public static boolean isText(PositionedByteRange src) {
1585    return TEXT == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1586  }
1587
1588  /**
1589   * Return true when the next encoded value in {@code src} uses BlobVar encoding, false otherwise.
1590   */
1591  public static boolean isBlobVar(PositionedByteRange src) {
1592    return BLOB_VAR
1593        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1594  }
1595
1596  /**
1597   * Return true when the next encoded value in {@code src} uses BlobCopy encoding, false otherwise.
1598   */
1599  public static boolean isBlobCopy(PositionedByteRange src) {
1600    return BLOB_COPY
1601        == (-1 == Integer.signum(src.peek()) ? DESCENDING : ASCENDING).apply(src.peek());
1602  }
1603
1604  /**
1605   * Skip {@code buff}'s position forward over one encoded value.
1606   * @return number of bytes skipped.
1607   */
1608  public static int skip(PositionedByteRange src) {
1609    final int start = src.getPosition();
1610    byte header = src.get();
1611    Order ord = (-1 == Integer.signum(header)) ? DESCENDING : ASCENDING;
1612    header = ord.apply(header);
1613
1614    switch (header) {
1615      case NULL:
1616      case NEG_INF:
1617        return 1;
1618      case NEG_LARGE: /* Large negative number: 0x08, ~E, ~M */
1619        skipVaruint64(src, DESCENDING != ord);
1620        skipSignificand(src, DESCENDING != ord);
1621        return src.getPosition() - start;
1622      case NEG_MED_MIN: /* Medium negative number: 0x13-E, ~M */
1623      case NEG_MED_MIN + 0x01:
1624      case NEG_MED_MIN + 0x02:
1625      case NEG_MED_MIN + 0x03:
1626      case NEG_MED_MIN + 0x04:
1627      case NEG_MED_MIN + 0x05:
1628      case NEG_MED_MIN + 0x06:
1629      case NEG_MED_MIN + 0x07:
1630      case NEG_MED_MIN + 0x08:
1631      case NEG_MED_MIN + 0x09:
1632      case NEG_MED_MAX:
1633        skipSignificand(src, DESCENDING != ord);
1634        return src.getPosition() - start;
1635      case NEG_SMALL: /* Small negative number: 0x14, -E, ~M */
1636        skipVaruint64(src, DESCENDING == ord);
1637        skipSignificand(src, DESCENDING != ord);
1638        return src.getPosition() - start;
1639      case ZERO:
1640        return 1;
1641      case POS_SMALL: /* Small positive number: 0x16, ~-E, M */
1642        skipVaruint64(src, DESCENDING != ord);
1643        skipSignificand(src, DESCENDING == ord);
1644        return src.getPosition() - start;
1645      case POS_MED_MIN: /* Medium positive number: 0x17+E, M */
1646      case POS_MED_MIN + 0x01:
1647      case POS_MED_MIN + 0x02:
1648      case POS_MED_MIN + 0x03:
1649      case POS_MED_MIN + 0x04:
1650      case POS_MED_MIN + 0x05:
1651      case POS_MED_MIN + 0x06:
1652      case POS_MED_MIN + 0x07:
1653      case POS_MED_MIN + 0x08:
1654      case POS_MED_MIN + 0x09:
1655      case POS_MED_MAX:
1656        skipSignificand(src, DESCENDING == ord);
1657        return src.getPosition() - start;
1658      case POS_LARGE: /* Large positive number: 0x22, E, M */
1659        skipVaruint64(src, DESCENDING == ord);
1660        skipSignificand(src, DESCENDING == ord);
1661        return src.getPosition() - start;
1662      case POS_INF:
1663        return 1;
1664      case NAN:
1665        return 1;
1666      case FIXED_INT8:
1667        src.setPosition(src.getPosition() + 1);
1668        return src.getPosition() - start;
1669      case FIXED_INT16:
1670        src.setPosition(src.getPosition() + 2);
1671        return src.getPosition() - start;
1672      case FIXED_INT32:
1673        src.setPosition(src.getPosition() + 4);
1674        return src.getPosition() - start;
1675      case FIXED_INT64:
1676        src.setPosition(src.getPosition() + 8);
1677        return src.getPosition() - start;
1678      case FIXED_FLOAT32:
1679        src.setPosition(src.getPosition() + 4);
1680        return src.getPosition() - start;
1681      case FIXED_FLOAT64:
1682        src.setPosition(src.getPosition() + 8);
1683        return src.getPosition() - start;
1684      case TEXT:
1685        // for null-terminated values, skip to the end.
1686        do {
1687          header = ord.apply(src.get());
1688        } while (header != TERM);
1689        return src.getPosition() - start;
1690      case BLOB_VAR:
1691        // read until we find a 0 in the MSB
1692        do {
1693          header = ord.apply(src.get());
1694        } while ((byte) (header & 0x80) != TERM);
1695        return src.getPosition() - start;
1696      case BLOB_COPY:
1697        if (Order.DESCENDING == ord) {
1698          // if descending, read to termination byte.
1699          do {
1700            header = ord.apply(src.get());
1701          } while (header != TERM);
1702          return src.getPosition() - start;
1703        } else {
1704          // otherwise, just skip to the end.
1705          src.setPosition(src.getLength());
1706          return src.getPosition() - start;
1707        }
1708      default:
1709        throw unexpectedHeader(header);
1710    }
1711  }
1712
1713  /**
1714   * Return the number of encoded entries remaining in {@code buff}. The state of {@code buff} is
1715   * not modified through use of this method.
1716   */
1717  public static int length(PositionedByteRange buff) {
1718    PositionedByteRange b =
1719      new SimplePositionedMutableByteRange(buff.getBytes(), buff.getOffset(), buff.getLength());
1720    b.setPosition(buff.getPosition());
1721    int cnt = 0;
1722    for (; isEncodedValue(b); skip(b), cnt++)
1723      ;
1724    return cnt;
1725  }
1726}