View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.io;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.InputStream;
26  import java.io.IOException;
27  import java.io.ObjectInputStream;
28  import java.io.ObjectOutputStream;
29  import java.io.Serializable;
30  import java.lang.reflect.Array;
31  import java.lang.reflect.InvocationTargetException;
32  import java.lang.reflect.Method;
33  import java.util.ArrayList;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableSet;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.conf.Configurable;
42  import org.apache.hadoop.conf.Configuration;
43  import org.apache.hadoop.conf.Configured;
44  import org.apache.hadoop.hbase.ClusterStatus;
45  import org.apache.hadoop.hbase.DoNotRetryIOException;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HServerAddress;
50  import org.apache.hadoop.hbase.HServerInfo;
51  import org.apache.hadoop.hbase.HServerLoad;
52  import org.apache.hadoop.hbase.HTableDescriptor;
53  import org.apache.hadoop.hbase.KeyValue;
54  import org.apache.hadoop.hbase.client.Action;
55  import org.apache.hadoop.hbase.client.Append;
56  import org.apache.hadoop.hbase.client.Delete;
57  import org.apache.hadoop.hbase.client.Get;
58  import org.apache.hadoop.hbase.client.Increment;
59  import org.apache.hadoop.hbase.client.MultiAction;
60  import org.apache.hadoop.hbase.client.MultiResponse;
61  import org.apache.hadoop.hbase.client.Put;
62  import org.apache.hadoop.hbase.client.Result;
63  import org.apache.hadoop.hbase.client.Row;
64  import org.apache.hadoop.hbase.client.RowMutations;
65  import org.apache.hadoop.hbase.client.Scan;
66  import org.apache.hadoop.hbase.client.coprocessor.Exec;
67  import org.apache.hadoop.hbase.filter.BinaryComparator;
68  import org.apache.hadoop.hbase.filter.BitComparator;
69  import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
70  import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
71  import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
72  import org.apache.hadoop.hbase.filter.CompareFilter;
73  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
74  import org.apache.hadoop.hbase.filter.DependentColumnFilter;
75  import org.apache.hadoop.hbase.filter.Filter;
76  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
77  import org.apache.hadoop.hbase.filter.FuzzyRowFilter;
78  import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
79  import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
80  import org.apache.hadoop.hbase.filter.PageFilter;
81  import org.apache.hadoop.hbase.filter.PrefixFilter;
82  import org.apache.hadoop.hbase.filter.QualifierFilter;
83  import org.apache.hadoop.hbase.filter.RandomRowFilter;
84  import org.apache.hadoop.hbase.filter.RowFilter;
85  import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
86  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
87  import org.apache.hadoop.hbase.filter.SkipFilter;
88  import org.apache.hadoop.hbase.filter.ValueFilter;
89  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
90  import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
91  import org.apache.hadoop.hbase.regionserver.HRegion;
92  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
93  import org.apache.hadoop.hbase.regionserver.wal.HLog;
94  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
95  import org.apache.hadoop.hbase.snapshot.HSnapshotDescription;
96  import org.apache.hadoop.hbase.util.Bytes;
97  import org.apache.hadoop.hbase.util.Classes;
98  import org.apache.hadoop.hbase.util.ProtoUtil;
99  import org.apache.hadoop.io.MapWritable;
100 import org.apache.hadoop.io.ObjectWritable;
101 import org.apache.hadoop.io.Text;
102 import org.apache.hadoop.io.Writable;
103 import org.apache.hadoop.io.WritableFactories;
104 import org.apache.hadoop.io.WritableUtils;
105 
106 import com.google.protobuf.Message;
107 
108 /**
109  * This is a customized version of the polymorphic hadoop
110  * {@link ObjectWritable}.  It removes UTF8 (HADOOP-414).
111  * Using {@link Text} intead of UTF-8 saves ~2% CPU between reading and writing
112  * objects running a short sequentialWrite Performance Evaluation test just in
113  * ObjectWritable alone; more when we're doing randomRead-ing.  Other
114  * optimizations include our passing codes for classes instead of the
115  * actual class names themselves.  This makes it so this class needs amendment
116  * if non-Writable classes are introduced -- if passed a Writable for which we
117  * have no code, we just do the old-school passing of the class name, etc. --
118  * but passing codes the  savings are large particularly when cell
119  * data is small (If < a couple of kilobytes, the encoding/decoding of class
120  * name and reflection to instantiate class was costing in excess of the cell
121  * handling).
122  */
123 public class HbaseObjectWritable implements Writable, WritableWithSize, Configurable {
124   protected final static Log LOG = LogFactory.getLog(HbaseObjectWritable.class);
125 
126   // Here we maintain two static maps of classes to code and vice versa.
127   // Add new classes+codes as wanted or figure way to auto-generate these
128   // maps from the HMasterInterface.
129   static final Map<Integer, Class<?>> CODE_TO_CLASS =
130     new HashMap<Integer, Class<?>>();
131   static final Map<Class<?>, Integer> CLASS_TO_CODE =
132     new HashMap<Class<?>, Integer>();
133   // Special code that means 'not-encoded'; in this case we do old school
134   // sending of the class name using reflection, etc.
135   private static final byte NOT_ENCODED = 0;
136   //Generic array means that the array type is not one of the pre-defined arrays
137   //in the CLASS_TO_CODE map, but we have to still encode the array since it's
138   //elements are serializable by this class.
139   private static final int GENERIC_ARRAY_CODE;
140   private static final int NEXT_CLASS_CODE;
141   static {
142     ////////////////////////////////////////////////////////////////////////////
143     // WARNING: Please do not insert, remove or swap any line in this static  //
144     // block.  Doing so would change or shift all the codes used to serialize //
145     // objects, which makes backwards compatibility very hard for clients.    //
146     // New codes should always be added at the end. Code removal is           //
147     // discouraged because code is a short now.                               //
148     ////////////////////////////////////////////////////////////////////////////
149 
150     int code = NOT_ENCODED + 1;
151     // Primitive types.
152     addToMap(Boolean.TYPE, code++);
153     addToMap(Byte.TYPE, code++);
154     addToMap(Character.TYPE, code++);
155     addToMap(Short.TYPE, code++);
156     addToMap(Integer.TYPE, code++);
157     addToMap(Long.TYPE, code++);
158     addToMap(Float.TYPE, code++);
159     addToMap(Double.TYPE, code++);
160     addToMap(Void.TYPE, code++);
161 
162     // Other java types
163     addToMap(String.class, code++);
164     addToMap(byte [].class, code++);
165     addToMap(byte [][].class, code++);
166 
167     // Hadoop types
168     addToMap(Text.class, code++);
169     addToMap(Writable.class, code++);
170     addToMap(Writable [].class, code++);
171     addToMap(HbaseMapWritable.class, code++);
172     addToMap(NullInstance.class, code++);
173 
174     // Hbase types
175     addToMap(HColumnDescriptor.class, code++);
176     addToMap(HConstants.Modify.class, code++);
177 
178     // We used to have a class named HMsg but its been removed.  Rather than
179     // just axe it, use following random Integer class -- we just chose any
180     // class from java.lang -- instead just so codes that follow stay
181     // in same relative place.
182     addToMap(Integer.class, code++);
183     addToMap(Integer[].class, code++);
184 
185     addToMap(HRegion.class, code++);
186     addToMap(HRegion[].class, code++);
187     addToMap(HRegionInfo.class, code++);
188     addToMap(HRegionInfo[].class, code++);
189     addToMap(HServerAddress.class, code++);
190     addToMap(HServerInfo.class, code++);
191     addToMap(HTableDescriptor.class, code++);
192     addToMap(MapWritable.class, code++);
193 
194     //
195     // HBASE-880
196     //
197     addToMap(ClusterStatus.class, code++);
198     addToMap(Delete.class, code++);
199     addToMap(Get.class, code++);
200     addToMap(KeyValue.class, code++);
201     addToMap(KeyValue[].class, code++);
202     addToMap(Put.class, code++);
203     addToMap(Put[].class, code++);
204     addToMap(Result.class, code++);
205     addToMap(Result[].class, code++);
206     addToMap(Scan.class, code++);
207 
208     addToMap(WhileMatchFilter.class, code++);
209     addToMap(PrefixFilter.class, code++);
210     addToMap(PageFilter.class, code++);
211     addToMap(InclusiveStopFilter.class, code++);
212     addToMap(ColumnCountGetFilter.class, code++);
213     addToMap(SingleColumnValueFilter.class, code++);
214     addToMap(SingleColumnValueExcludeFilter.class, code++);
215     addToMap(BinaryComparator.class, code++);
216     addToMap(BitComparator.class, code++);
217     addToMap(CompareFilter.class, code++);
218     addToMap(RowFilter.class, code++);
219     addToMap(ValueFilter.class, code++);
220     addToMap(QualifierFilter.class, code++);
221     addToMap(SkipFilter.class, code++);
222     addToMap(WritableByteArrayComparable.class, code++);
223     addToMap(FirstKeyOnlyFilter.class, code++);
224     addToMap(DependentColumnFilter.class, code++);
225 
226     addToMap(Delete [].class, code++);
227 
228     addToMap(HLog.Entry.class, code++);
229     addToMap(HLog.Entry[].class, code++);
230     addToMap(HLogKey.class, code++);
231 
232     addToMap(List.class, code++);
233 
234     addToMap(NavigableSet.class, code++);
235     addToMap(ColumnPrefixFilter.class, code++);
236 
237     // Multi
238     addToMap(Row.class, code++);
239     addToMap(Action.class, code++);
240     addToMap(MultiAction.class, code++);
241     addToMap(MultiResponse.class, code++);
242 
243     // coprocessor execution
244     addToMap(Exec.class, code++);
245     addToMap(Increment.class, code++);
246 
247     addToMap(KeyOnlyFilter.class, code++);
248 
249     // serializable
250     addToMap(Serializable.class, code++);
251 
252     addToMap(RandomRowFilter.class, code++);
253 
254     addToMap(CompareOp.class, code++);
255 
256     addToMap(ColumnRangeFilter.class, code++);
257 
258     addToMap(HServerLoad.class, code++);
259 
260     addToMap(RegionOpeningState.class, code++);
261 
262     addToMap(HTableDescriptor[].class, code++);
263 
264     addToMap(Append.class, code++);
265 
266     addToMap(RowMutations.class, code++);
267 
268     addToMap(Message.class, code++);
269 
270     //java.lang.reflect.Array is a placeholder for arrays not defined above
271     GENERIC_ARRAY_CODE = code++;
272     addToMap(Array.class, GENERIC_ARRAY_CODE);
273     
274     addToMap(FuzzyRowFilter.class, code++);
275 
276     // we aren't going to bump the rpc version number.
277     // we don't want to cause incompatiblity with older 0.94/0.92 clients.
278     addToMap(HSnapshotDescription.class, code);
279 
280     // make sure that this is the last statement in this static block
281     NEXT_CLASS_CODE = code;
282   }
283 
284   private Class<?> declaredClass;
285   private Object instance;
286   private Configuration conf;
287 
288   /** default constructor for writable */
289   public HbaseObjectWritable() {
290     super();
291   }
292 
293   /**
294    * @param instance
295    */
296   public HbaseObjectWritable(Object instance) {
297     set(instance);
298   }
299 
300   /**
301    * @param declaredClass
302    * @param instance
303    */
304   public HbaseObjectWritable(Class<?> declaredClass, Object instance) {
305     this.declaredClass = declaredClass;
306     this.instance = instance;
307   }
308 
309   /** @return the instance, or null if none. */
310   public Object get() { return instance; }
311 
312   /** @return the class this is meant to be. */
313   public Class<?> getDeclaredClass() { return declaredClass; }
314 
315   /**
316    * Reset the instance.
317    * @param instance
318    */
319   public void set(Object instance) {
320     this.declaredClass = instance.getClass();
321     this.instance = instance;
322   }
323 
324   /**
325    * @see java.lang.Object#toString()
326    */
327   @Override
328   public String toString() {
329     return "OW[class=" + declaredClass + ",value=" + instance + "]";
330   }
331 
332 
333   public void readFields(DataInput in) throws IOException {
334     readObject(in, this, this.conf);
335   }
336 
337   public void write(DataOutput out) throws IOException {
338     writeObject(out, instance, declaredClass, conf);
339   }
340 
341   public long getWritableSize() {
342     return getWritableSize(instance, declaredClass, conf);
343   }
344 
345   private static class NullInstance extends Configured implements Writable {
346     Class<?> declaredClass;
347     /** default constructor for writable */
348     @SuppressWarnings("unused")
349     public NullInstance() { super(null); }
350 
351     /**
352      * @param declaredClass
353      * @param conf
354      */
355     public NullInstance(Class<?> declaredClass, Configuration conf) {
356       super(conf);
357       this.declaredClass = declaredClass;
358     }
359 
360     public void readFields(DataInput in) throws IOException {
361       this.declaredClass = CODE_TO_CLASS.get(WritableUtils.readVInt(in));
362     }
363 
364     public void write(DataOutput out) throws IOException {
365       writeClassCode(out, this.declaredClass);
366     }
367   }
368 
369   static Integer getClassCode(final Class<?> c)
370   throws IOException {
371     Integer code = CLASS_TO_CODE.get(c);
372     if (code == null ) {
373       if (List.class.isAssignableFrom(c)) {
374         code = CLASS_TO_CODE.get(List.class);
375       } else if (Writable.class.isAssignableFrom(c)) {
376         code = CLASS_TO_CODE.get(Writable.class);
377       } else if (c.isArray()) {
378         code = CLASS_TO_CODE.get(Array.class);
379       } else if (Message.class.isAssignableFrom(c)) {
380         code = CLASS_TO_CODE.get(Message.class);
381       } else if (Serializable.class.isAssignableFrom(c)){
382         code = CLASS_TO_CODE.get(Serializable.class);
383       }
384     }
385     return code;
386   }
387 
388   /**
389    * @return the next object code in the list.  Used in testing to verify that additional fields are not added 
390    */
391   static int getNextClassCode(){
392     return NEXT_CLASS_CODE;
393   }
394 
395   /**
396    * Write out the code for passed Class.
397    * @param out
398    * @param c
399    * @throws IOException
400    */
401   static void writeClassCode(final DataOutput out, final Class<?> c)
402       throws IOException {
403     Integer code = getClassCode(c);
404 
405     if (code == null) {
406       LOG.error("Unsupported type " + c);
407       StackTraceElement[] els = new Exception().getStackTrace();
408       for(StackTraceElement elem : els) {
409         LOG.error(elem.getMethodName());
410       }
411       throw new UnsupportedOperationException("No code for unexpected " + c);
412     }
413     WritableUtils.writeVInt(out, code);
414   }
415 
416   public static long getWritableSize(Object instance, Class declaredClass,
417                                      Configuration conf) {
418     long size = Bytes.SIZEOF_BYTE; // code
419     if (instance == null) {
420       return 0L;
421     }
422 
423     if (declaredClass.isArray()) {
424       if (declaredClass.equals(Result[].class)) {
425 
426         return size + Result.getWriteArraySize((Result[])instance);
427       }
428     }
429     if (declaredClass.equals(Result.class)) {
430       Result r = (Result) instance;
431       // one extra class code for writable instance.
432       return r.getWritableSize() + size + Bytes.SIZEOF_BYTE;
433     }
434     return 0L; // no hint is the default.
435   }
436   /**
437    * Write a {@link Writable}, {@link String}, primitive type, or an array of
438    * the preceding.
439    * @param out
440    * @param instance
441    * @param declaredClass
442    * @param conf
443    * @throws IOException
444    */
445   @SuppressWarnings("unchecked")
446   public static void writeObject(DataOutput out, Object instance,
447                                  Class declaredClass,
448                                  Configuration conf)
449   throws IOException {
450 
451     Object instanceObj = instance;
452     Class declClass = declaredClass;
453 
454     if (instanceObj == null) {                       // null
455       instanceObj = new NullInstance(declClass, conf);
456       declClass = Writable.class;
457     }
458     writeClassCode(out, declClass);
459     if (declClass.isArray()) {                // array
460       // If bytearray, just dump it out -- avoid the recursion and
461       // byte-at-a-time we were previously doing.
462       if (declClass.equals(byte [].class)) {
463         Bytes.writeByteArray(out, (byte [])instanceObj);
464       } else if(declClass.equals(Result [].class)) {
465         Result.writeArray(out, (Result [])instanceObj);
466       } else {
467         //if it is a Generic array, write the element's type
468         if (getClassCode(declaredClass) == GENERIC_ARRAY_CODE) {
469           Class<?> componentType = declaredClass.getComponentType();
470           writeClass(out, componentType);
471         }
472 
473         int length = Array.getLength(instanceObj);
474         out.writeInt(length);
475         for (int i = 0; i < length; i++) {
476           Object item = Array.get(instanceObj, i);
477           writeObject(out, item,
478                     item.getClass(), conf);
479         }
480       }
481     } else if (List.class.isAssignableFrom(declClass)) {
482       List list = (List)instanceObj;
483       int length = list.size();
484       out.writeInt(length);
485       for (int i = 0; i < length; i++) {
486         Object elem = list.get(i);
487         writeObject(out, elem,
488                   elem == null ? Writable.class : elem.getClass(), conf);
489       }
490     } else if (declClass == String.class) {   // String
491       Text.writeString(out, (String)instanceObj);
492     } else if (declClass.isPrimitive()) {     // primitive type
493       if (declClass == Boolean.TYPE) {        // boolean
494         out.writeBoolean(((Boolean)instanceObj).booleanValue());
495       } else if (declClass == Character.TYPE) { // char
496         out.writeChar(((Character)instanceObj).charValue());
497       } else if (declClass == Byte.TYPE) {    // byte
498         out.writeByte(((Byte)instanceObj).byteValue());
499       } else if (declClass == Short.TYPE) {   // short
500         out.writeShort(((Short)instanceObj).shortValue());
501       } else if (declClass == Integer.TYPE) { // int
502         out.writeInt(((Integer)instanceObj).intValue());
503       } else if (declClass == Long.TYPE) {    // long
504         out.writeLong(((Long)instanceObj).longValue());
505       } else if (declClass == Float.TYPE) {   // float
506         out.writeFloat(((Float)instanceObj).floatValue());
507       } else if (declClass == Double.TYPE) {  // double
508         out.writeDouble(((Double)instanceObj).doubleValue());
509       } else if (declClass == Void.TYPE) {    // void
510       } else {
511         throw new IllegalArgumentException("Not a primitive: "+declClass);
512       }
513     } else if (declClass.isEnum()) {         // enum
514       Text.writeString(out, ((Enum)instanceObj).name());
515     } else if (Message.class.isAssignableFrom(declaredClass)) {
516       Text.writeString(out, instanceObj.getClass().getName());
517       ((Message)instance).writeDelimitedTo(
518           DataOutputOutputStream.constructOutputStream(out));
519     } else if (Writable.class.isAssignableFrom(declClass)) { // Writable
520       Class <?> c = instanceObj.getClass();
521       Integer code = CLASS_TO_CODE.get(c);
522       if (code == null) {
523         out.writeByte(NOT_ENCODED);
524         Text.writeString(out, c.getName());
525       } else {
526         writeClassCode(out, c);
527       }
528       ((Writable)instanceObj).write(out);
529     } else if (Serializable.class.isAssignableFrom(declClass)) {
530       Class <?> c = instanceObj.getClass();
531       Integer code = CLASS_TO_CODE.get(c);
532       if (code == null) {
533         out.writeByte(NOT_ENCODED);
534         Text.writeString(out, c.getName());
535       } else {
536         writeClassCode(out, c);
537       }
538       ByteArrayOutputStream bos = null;
539       ObjectOutputStream oos = null;
540       try{
541         bos = new ByteArrayOutputStream();
542         oos = new ObjectOutputStream(bos);
543         oos.writeObject(instanceObj);
544         byte[] value = bos.toByteArray();
545         out.writeInt(value.length);
546         out.write(value);
547       } finally {
548         if(bos!=null) bos.close();
549         if(oos!=null) oos.close();
550       }
551     } else {
552       throw new IOException("Can't write: "+instanceObj+" as "+declClass);
553     }
554   }
555 
556   /** Writes the encoded class code as defined in CLASS_TO_CODE, or
557    * the whole class name if not defined in the mapping.
558    */
559   static void writeClass(DataOutput out, Class<?> c) throws IOException {
560     Integer code = CLASS_TO_CODE.get(c);
561     if (code == null) {
562       WritableUtils.writeVInt(out, NOT_ENCODED);
563       Text.writeString(out, c.getName());
564     } else {
565       WritableUtils.writeVInt(out, code);
566     }
567   }
568 
569   /** Reads and returns the class as written by {@link #writeClass(DataOutput, Class)} */
570   static Class<?> readClass(Configuration conf, DataInput in) throws IOException {
571     Class<?> instanceClass = null;
572     int b = (byte)WritableUtils.readVInt(in);
573     if (b == NOT_ENCODED) {
574       String className = Text.readString(in);
575       try {
576         instanceClass = getClassByName(conf, className);
577       } catch (ClassNotFoundException e) {
578         LOG.error("Can't find class " + className, e);
579         throw new IOException("Can't find class " + className, e);
580       }
581     } else {
582       instanceClass = CODE_TO_CLASS.get(b);
583     }
584     return instanceClass;
585   }
586 
587   /**
588    * Read a {@link Writable}, {@link String}, primitive type, or an array of
589    * the preceding.
590    * @param in
591    * @param conf
592    * @return the object
593    * @throws IOException
594    */
595   public static Object readObject(DataInput in, Configuration conf)
596     throws IOException {
597     return readObject(in, null, conf);
598   }
599 
600   /**
601    * Read a {@link Writable}, {@link String}, primitive type, or an array of
602    * the preceding.
603    * @param in
604    * @param objectWritable
605    * @param conf
606    * @return the object
607    * @throws IOException
608    */
609   @SuppressWarnings("unchecked")
610   public static Object readObject(DataInput in,
611       HbaseObjectWritable objectWritable, Configuration conf)
612   throws IOException {
613     Class<?> declaredClass = CODE_TO_CLASS.get(WritableUtils.readVInt(in));
614     Object instance;
615     if (declaredClass.isPrimitive()) {            // primitive types
616       if (declaredClass == Boolean.TYPE) {             // boolean
617         instance = Boolean.valueOf(in.readBoolean());
618       } else if (declaredClass == Character.TYPE) {    // char
619         instance = Character.valueOf(in.readChar());
620       } else if (declaredClass == Byte.TYPE) {         // byte
621         instance = Byte.valueOf(in.readByte());
622       } else if (declaredClass == Short.TYPE) {        // short
623         instance = Short.valueOf(in.readShort());
624       } else if (declaredClass == Integer.TYPE) {      // int
625         instance = Integer.valueOf(in.readInt());
626       } else if (declaredClass == Long.TYPE) {         // long
627         instance = Long.valueOf(in.readLong());
628       } else if (declaredClass == Float.TYPE) {        // float
629         instance = Float.valueOf(in.readFloat());
630       } else if (declaredClass == Double.TYPE) {       // double
631         instance = Double.valueOf(in.readDouble());
632       } else if (declaredClass == Void.TYPE) {         // void
633         instance = null;
634       } else {
635         throw new IllegalArgumentException("Not a primitive: "+declaredClass);
636       }
637     } else if (declaredClass.isArray()) {              // array
638       if (declaredClass.equals(byte [].class)) {
639         instance = Bytes.readByteArray(in);
640       } else if(declaredClass.equals(Result [].class)) {
641         instance = Result.readArray(in);
642       } else {
643         int length = in.readInt();
644         instance = Array.newInstance(declaredClass.getComponentType(), length);
645         for (int i = 0; i < length; i++) {
646           Array.set(instance, i, readObject(in, conf));
647         }
648       }
649     } else if (declaredClass.equals(Array.class)) { //an array not declared in CLASS_TO_CODE
650       Class<?> componentType = readClass(conf, in);
651       int length = in.readInt();
652       instance = Array.newInstance(componentType, length);
653       for (int i = 0; i < length; i++) {
654         Array.set(instance, i, readObject(in, conf));
655       }
656     } else if (List.class.isAssignableFrom(declaredClass)) {            // List
657       int length = in.readInt();
658       instance = new ArrayList(length);
659       for (int i = 0; i < length; i++) {
660         ((ArrayList)instance).add(readObject(in, conf));
661       }
662     } else if (declaredClass == String.class) {        // String
663       instance = Text.readString(in);
664     } else if (declaredClass.isEnum()) {         // enum
665       instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
666         Text.readString(in));
667     } else if (declaredClass == Message.class) {
668       String className = Text.readString(in);
669       try {
670         declaredClass = getClassByName(conf, className);
671         instance = tryInstantiateProtobuf(declaredClass, in);
672       } catch (ClassNotFoundException e) {
673         LOG.error("Can't find class " + className, e);
674         throw new DoNotRetryIOException("Can't find class " + className, e);
675       }
676     } else {                                      // Writable or Serializable
677       Class instanceClass = null;
678       int b = (byte)WritableUtils.readVInt(in);
679       if (b == NOT_ENCODED) {
680         String className = Text.readString(in);
681         try {
682           instanceClass = getClassByName(conf, className);
683         } catch (ClassNotFoundException e) {
684           LOG.error("Can't find class " + className, e);
685           throw new DoNotRetryIOException("Can't find class " + className, e);
686         }
687       } else {
688         instanceClass = CODE_TO_CLASS.get(b);
689       }
690       if(Writable.class.isAssignableFrom(instanceClass)){
691         Writable writable = WritableFactories.newInstance(instanceClass, conf);
692         try {
693           writable.readFields(in);
694         } catch (IOException io) {
695           LOG.error("Error in readFields", io);
696           throw io;
697         } catch (Exception e) {
698           LOG.error("Error in readFields", e);
699           throw new IOException("Error in readFields" , e);
700         }
701         instance = writable;
702         if (instanceClass == NullInstance.class) {  // null
703           declaredClass = ((NullInstance)instance).declaredClass;
704           instance = null;
705         }
706       } else {
707         int length = in.readInt();
708         byte[] objectBytes = new byte[length];
709         in.readFully(objectBytes);
710         ByteArrayInputStream bis = null;
711         ObjectInputStream ois = null;
712         try {
713           bis = new ByteArrayInputStream(objectBytes);
714           ois = new ObjectInputStream(bis);
715           instance = ois.readObject();
716         } catch (ClassNotFoundException e) {
717           LOG.error("Class not found when attempting to deserialize object", e);
718           throw new DoNotRetryIOException("Class not found when attempting to " +
719               "deserialize object", e);
720         } finally {
721           if(bis!=null) bis.close();
722           if(ois!=null) ois.close();
723         }
724       }
725     }
726     if (objectWritable != null) {                 // store values
727       objectWritable.declaredClass = declaredClass;
728       objectWritable.instance = instance;
729     }
730     return instance;
731   }
732 
733   /**
734    * Read a {@link Filter} which is written as a {@link Writable}
735    * or a {@link Filter} directly. A custom filter class may be loaded
736    * dynamically.
737    *
738    * @param in
739    * @param conf
740    * @return the filter
741    * @throws IOException
742    */
743   @SuppressWarnings("unchecked")
744   public static Filter readFilter(
745       DataInput in, Configuration conf) throws IOException {
746     Class<?> instanceClass = null;
747     int b = (byte)WritableUtils.readVInt(in);
748     if (b != NOT_ENCODED) {
749       instanceClass = CODE_TO_CLASS.get(b);
750       if (instanceClass == Writable.class) {
751         // In case Writable, the actual type code follows
752         b = (byte)WritableUtils.readVInt(in);
753         if (b != NOT_ENCODED) {
754           instanceClass = CODE_TO_CLASS.get(b);
755         }
756       }
757     }
758     if (b == NOT_ENCODED) {
759       String className = Text.readString(in);
760       try {
761         instanceClass = (Class<? extends Filter>)getClassByName(conf, className);
762       } catch (ClassNotFoundException cnfe) {
763         try {
764           instanceClass = Classes.getFilterClassByName(className);
765         } catch (ClassNotFoundException e) {
766           LOG.error("Can't find class " + className, e);
767           throw new DoNotRetryIOException("Can't find class " + className, e);
768         }
769       }
770     }
771     Filter filter = (Filter)WritableFactories.newInstance(
772       (Class<? extends Filter>)instanceClass, conf);
773     try {
774       filter.readFields(in);
775     } catch (IOException io) {
776       LOG.error("Error in readFields", io);
777       throw io;
778     } catch (Exception e) {
779       LOG.error("Error in readFields", e);
780       throw new IOException("Error in readFields" , e);
781     }
782     return filter;
783   }
784 
785   /**
786    * Try to instantiate a protocol buffer of the given message class
787    * from the given input stream.
788    *
789    * @param protoClass the class of the generated protocol buffer
790    * @param dataIn the input stream to read from
791    * @return the instantiated Message instance
792    * @throws IOException if an IO problem occurs
793    */
794   private static Message tryInstantiateProtobuf(
795       Class<?> protoClass,
796       DataInput dataIn) throws IOException {
797 
798     try {
799       if (dataIn instanceof InputStream) {
800         // We can use the built-in parseDelimitedFrom and not have to re-copy
801         // the data
802         Method parseMethod = getStaticProtobufMethod(protoClass,
803             "parseDelimitedFrom", InputStream.class);
804         return (Message)parseMethod.invoke(null, (InputStream)dataIn);
805       } else {
806         // Have to read it into a buffer first, since protobuf doesn't deal
807         // with the DataInput interface directly.
808 
809         // Read the size delimiter that writeDelimitedTo writes
810         int size = ProtoUtil.readRawVarint32(dataIn);
811         if (size < 0) {
812           throw new IOException("Invalid size: " + size);
813         }
814 
815         byte[] data = new byte[size];
816         dataIn.readFully(data);
817         Method parseMethod = getStaticProtobufMethod(protoClass,
818             "parseFrom", byte[].class);
819         return (Message)parseMethod.invoke(null, data);
820       }
821     } catch (InvocationTargetException e) {
822 
823       if (e.getCause() instanceof IOException) {
824         throw (IOException)e.getCause();
825       } else {
826         throw new IOException(e.getCause());
827       }
828     } catch (IllegalAccessException iae) {
829       throw new AssertionError("Could not access parse method in " +
830           protoClass);
831     }
832   }
833 
834   static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
835       Class<?> ... args) {
836 
837     try {
838       return declaredClass.getMethod(method, args);
839     } catch (Exception e) {
840       // This is a bug in Hadoop - protobufs should all have this static method
841       throw new AssertionError("Protocol buffer class " + declaredClass +
842           " does not have an accessible parseFrom(InputStream) method!");
843     }
844   }
845 
846   @SuppressWarnings("unchecked")
847   private static Class getClassByName(Configuration conf, String className)
848   throws ClassNotFoundException {
849     if(conf != null) {
850       return conf.getClassByName(className);
851     }
852     ClassLoader cl = Thread.currentThread().getContextClassLoader();
853     if(cl == null) {
854       cl = HbaseObjectWritable.class.getClassLoader();
855     }
856     return Class.forName(className, true, cl);
857   }
858 
859   private static void addToMap(final Class<?> clazz, final int code) {
860     CLASS_TO_CODE.put(clazz, code);
861     CODE_TO_CLASS.put(code, clazz);
862   }
863 
864   public void setConf(Configuration conf) {
865     this.conf = conf;
866   }
867 
868   public Configuration getConf() {
869     return this.conf;
870   }
871 }