View Javadoc

1   /**
2    * Licensed to the Apache Software Foundation (ASF) under one
3    * or more contributor license agreements.  See the NOTICE file
4    * distributed with this work for additional information
5    * regarding copyright ownership.  The ASF licenses this file
6    * to you under the Apache License, Version 2.0 (the
7    * "License"); you may not use this file except in compliance
8    * with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   * Unless required by applicable law or agreed to in writing, software
13   * distributed under the License is distributed on an "AS IS" BASIS,
14   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15   * See the License for the specific language governing permissions and
16   * limitations under the License.
17   */
18  
19  package org.apache.hadoop.hbase.security.access;
20  
21  import java.io.ByteArrayInputStream;
22  import java.io.ByteArrayOutputStream;
23  import java.io.DataInput;
24  import java.io.DataOutput;
25  import java.io.IOException;
26  import java.io.InputStream;
27  import java.io.ObjectInputStream;
28  import java.io.ObjectOutputStream;
29  import java.io.Serializable;
30  import java.lang.reflect.Array;
31  import java.lang.reflect.InvocationTargetException;
32  import java.lang.reflect.Method;
33  import java.util.ArrayList;
34  import java.util.HashMap;
35  import java.util.List;
36  import java.util.Map;
37  import java.util.NavigableSet;
38  
39  import org.apache.commons.logging.Log;
40  import org.apache.commons.logging.LogFactory;
41  import org.apache.hadoop.hbase.classification.InterfaceAudience;
42  import org.apache.hadoop.conf.Configurable;
43  import org.apache.hadoop.conf.Configuration;
44  import org.apache.hadoop.conf.Configured;
45  import org.apache.hadoop.hbase.ClusterStatus;
46  import org.apache.hadoop.hbase.HColumnDescriptor;
47  import org.apache.hadoop.hbase.HConstants;
48  import org.apache.hadoop.hbase.HRegionInfo;
49  import org.apache.hadoop.hbase.HTableDescriptor;
50  import org.apache.hadoop.hbase.KeyValue;
51  import org.apache.hadoop.hbase.client.Action;
52  import org.apache.hadoop.hbase.client.Append;
53  import org.apache.hadoop.hbase.client.Delete;
54  import org.apache.hadoop.hbase.client.Get;
55  import org.apache.hadoop.hbase.client.Increment;
56  import org.apache.hadoop.hbase.client.MultiAction;
57  import org.apache.hadoop.hbase.client.MultiResponse;
58  import org.apache.hadoop.hbase.client.Put;
59  import org.apache.hadoop.hbase.client.Result;
60  import org.apache.hadoop.hbase.client.Row;
61  import org.apache.hadoop.hbase.client.RowMutations;
62  import org.apache.hadoop.hbase.client.Scan;
63  import org.apache.hadoop.hbase.filter.BinaryComparator;
64  import org.apache.hadoop.hbase.filter.BitComparator;
65  import org.apache.hadoop.hbase.filter.ByteArrayComparable;
66  import org.apache.hadoop.hbase.filter.ColumnCountGetFilter;
67  import org.apache.hadoop.hbase.filter.ColumnPrefixFilter;
68  import org.apache.hadoop.hbase.filter.ColumnRangeFilter;
69  import org.apache.hadoop.hbase.filter.CompareFilter;
70  import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
71  import org.apache.hadoop.hbase.filter.DependentColumnFilter;
72  import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
73  import org.apache.hadoop.hbase.filter.InclusiveStopFilter;
74  import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
75  import org.apache.hadoop.hbase.filter.PageFilter;
76  import org.apache.hadoop.hbase.filter.PrefixFilter;
77  import org.apache.hadoop.hbase.filter.QualifierFilter;
78  import org.apache.hadoop.hbase.filter.RandomRowFilter;
79  import org.apache.hadoop.hbase.filter.RowFilter;
80  import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
81  import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
82  import org.apache.hadoop.hbase.filter.SkipFilter;
83  import org.apache.hadoop.hbase.filter.ValueFilter;
84  import org.apache.hadoop.hbase.filter.WhileMatchFilter;
85  import org.apache.hadoop.hbase.io.WritableWithSize;
86  import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
87  import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
88  import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
89  import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
90  import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
91  import org.apache.hadoop.hbase.wal.WAL.Entry;
92  import org.apache.hadoop.hbase.wal.WALKey;
93  import org.apache.hadoop.hbase.util.Bytes;
94  import org.apache.hadoop.hbase.util.ProtoUtil;
95  import org.apache.hadoop.io.DataOutputOutputStream;
96  import org.apache.hadoop.io.MapWritable;
97  import org.apache.hadoop.io.ObjectWritable;
98  import org.apache.hadoop.io.Text;
99  import org.apache.hadoop.io.Writable;
100 import org.apache.hadoop.io.WritableFactories;
101 import org.apache.hadoop.io.WritableUtils;
102 
103 import com.google.protobuf.Message;
104 import com.google.protobuf.RpcController;
105 
106 /**
107  * <p>This is a customized version of the polymorphic hadoop
108  * {@link ObjectWritable}.  It removes UTF8 (HADOOP-414).
109  * Using {@link Text} intead of UTF-8 saves ~2% CPU between reading and writing
110  * objects running a short sequentialWrite Performance Evaluation test just in
111  * ObjectWritable alone; more when we're doing randomRead-ing.  Other
112  * optimizations include our passing codes for classes instead of the
113  * actual class names themselves.  This makes it so this class needs amendment
114  * if non-Writable classes are introduced -- if passed a Writable for which we
115  * have no code, we just do the old-school passing of the class name, etc. --
116  * but passing codes the  savings are large particularly when cell
117  * data is small (If < a couple of kilobytes, the encoding/decoding of class
118  * name and reflection to instantiate class was costing in excess of the cell
119  * handling).
120  * @deprecated This class is needed migrating TablePermissions written with
121  * Writables.  It is needed to read old permissions written pre-0.96.  This
122  * class is to be removed after HBase 0.96 ships since then all permissions
123  * will have been migrated and written with protobufs.
124  */
125 @Deprecated
126 @InterfaceAudience.Private
127 class HbaseObjectWritableFor96Migration implements Writable, WritableWithSize, Configurable {
128   protected final static Log LOG = LogFactory.getLog(HbaseObjectWritableFor96Migration.class);
129 
130   // Here we maintain two static maps of classes to code and vice versa.
131   // Add new classes+codes as wanted or figure way to auto-generate these
132   // maps.
133   static final Map<Integer, Class<?>> CODE_TO_CLASS =
134     new HashMap<Integer, Class<?>>();
135   static final Map<Class<?>, Integer> CLASS_TO_CODE =
136     new HashMap<Class<?>, Integer>();
137   // Special code that means 'not-encoded'; in this case we do old school
138   // sending of the class name using reflection, etc.
139   private static final byte NOT_ENCODED = 0;
140   //Generic array means that the array type is not one of the pre-defined arrays
141   //in the CLASS_TO_CODE map, but we have to still encode the array since it's
142   //elements are serializable by this class.
143   private static final int GENERIC_ARRAY_CODE;
144   private static final int NEXT_CLASS_CODE;
145   static {
146     ////////////////////////////////////////////////////////////////////////////
147     // WARNING: Please do not insert, remove or swap any line in this static  //
148     // block.  Doing so would change or shift all the codes used to serialize //
149     // objects, which makes backwards compatibility very hard for clients.    //
150     // New codes should always be added at the end. Code removal is           //
151     // discouraged because code is a short now.                               //
152     ////////////////////////////////////////////////////////////////////////////
153 
154     int code = NOT_ENCODED + 1;
155     // Primitive types.
156     addToMap(Boolean.TYPE, code++);
157     addToMap(Byte.TYPE, code++);
158     addToMap(Character.TYPE, code++);
159     addToMap(Short.TYPE, code++);
160     addToMap(Integer.TYPE, code++);
161     addToMap(Long.TYPE, code++);
162     addToMap(Float.TYPE, code++);
163     addToMap(Double.TYPE, code++);
164     addToMap(Void.TYPE, code++);
165 
166     // Other java types
167     addToMap(String.class, code++);
168     addToMap(byte [].class, code++);
169     addToMap(byte [][].class, code++);
170 
171     // Hadoop types
172     addToMap(Text.class, code++);
173     addToMap(Writable.class, code++);
174     addToMap(Writable [].class, code++);
175     code++; // Removed
176     addToMap(NullInstance.class, code++);
177 
178     // Hbase types
179     addToMap(HColumnDescriptor.class, code++);
180     addToMap(HConstants.Modify.class, code++);
181 
182     // We used to have a class named HMsg but its been removed.  Rather than
183     // just axe it, use following random Integer class -- we just chose any
184     // class from java.lang -- instead just so codes that follow stay
185     // in same relative place.
186     addToMap(Integer.class, code++);
187     addToMap(Integer[].class, code++);
188 
189     //HRegion shouldn't be pushed across the wire.
190     code++; //addToMap(HRegion.class, code++);
191     code++; //addToMap(HRegion[].class, code++);
192 
193     addToMap(HRegionInfo.class, code++);
194     addToMap(HRegionInfo[].class, code++);
195     code++; // Removed
196     code++; // Removed
197     addToMap(HTableDescriptor.class, code++);
198     addToMap(MapWritable.class, code++);
199 
200     //
201     // HBASE-880
202     //
203     addToMap(ClusterStatus.class, code++);
204     addToMap(Delete.class, code++);
205     addToMap(Get.class, code++);
206     addToMap(KeyValue.class, code++);
207     addToMap(KeyValue[].class, code++);
208     addToMap(Put.class, code++);
209     addToMap(Put[].class, code++);
210     addToMap(Result.class, code++);
211     addToMap(Result[].class, code++);
212     addToMap(Scan.class, code++);
213 
214     addToMap(WhileMatchFilter.class, code++);
215     addToMap(PrefixFilter.class, code++);
216     addToMap(PageFilter.class, code++);
217     addToMap(InclusiveStopFilter.class, code++);
218     addToMap(ColumnCountGetFilter.class, code++);
219     addToMap(SingleColumnValueFilter.class, code++);
220     addToMap(SingleColumnValueExcludeFilter.class, code++);
221     addToMap(BinaryComparator.class, code++);
222     addToMap(BitComparator.class, code++);
223     addToMap(CompareFilter.class, code++);
224     addToMap(RowFilter.class, code++);
225     addToMap(ValueFilter.class, code++);
226     addToMap(QualifierFilter.class, code++);
227     addToMap(SkipFilter.class, code++);
228     addToMap(ByteArrayComparable.class, code++);
229     addToMap(FirstKeyOnlyFilter.class, code++);
230     addToMap(DependentColumnFilter.class, code++);
231 
232     addToMap(Delete [].class, code++);
233 
234     addToMap(Entry.class, code++);
235     addToMap(Entry[].class, code++);
236     addToMap(HLogKey.class, code++);
237 
238     addToMap(List.class, code++);
239 
240     addToMap(NavigableSet.class, code++);
241     addToMap(ColumnPrefixFilter.class, code++);
242 
243     // Multi
244     addToMap(Row.class, code++);
245     addToMap(Action.class, code++);
246     addToMap(MultiAction.class, code++);
247     addToMap(MultiResponse.class, code++);
248 
249     // coprocessor execution
250     // Exec no longer exists --> addToMap(Exec.class, code++);
251     code++;
252     addToMap(Increment.class, code++);
253 
254     addToMap(KeyOnlyFilter.class, code++);
255 
256     // serializable
257     addToMap(Serializable.class, code++);
258 
259     addToMap(RandomRowFilter.class, code++);
260 
261     addToMap(CompareOp.class, code++);
262 
263     addToMap(ColumnRangeFilter.class, code++);
264 
265     // HServerLoad no longer exists; increase code so other classes stay the same.
266     code++;
267     //addToMap(HServerLoad.class, code++);
268 
269     addToMap(RegionOpeningState.class, code++);
270 
271     addToMap(HTableDescriptor[].class, code++);
272 
273     addToMap(Append.class, code++);
274 
275     addToMap(RowMutations.class, code++);
276 
277     addToMap(Message.class, code++);
278 
279     //java.lang.reflect.Array is a placeholder for arrays not defined above
280     GENERIC_ARRAY_CODE = code++;
281     addToMap(Array.class, GENERIC_ARRAY_CODE);
282 
283     addToMap(RpcController.class, code++);
284 
285     // make sure that this is the last statement in this static block
286     NEXT_CLASS_CODE = code;
287   }
288 
289   private Class<?> declaredClass;
290   private Object instance;
291   private Configuration conf;
292 
293   /** default constructor for writable */
294   HbaseObjectWritableFor96Migration() {
295     super();
296   }
297 
298   /**
299    * @param instance
300    */
301   HbaseObjectWritableFor96Migration(Object instance) {
302     set(instance);
303   }
304 
305   /**
306    * @param declaredClass
307    * @param instance
308    */
309   HbaseObjectWritableFor96Migration(Class<?> declaredClass, Object instance) {
310     this.declaredClass = declaredClass;
311     this.instance = instance;
312   }
313 
314   /** @return the instance, or null if none. */
315   Object get() { return instance; }
316 
317   /** @return the class this is meant to be. */
318   Class<?> getDeclaredClass() { return declaredClass; }
319 
320   /**
321    * Reset the instance.
322    * @param instance
323    */
324   void set(Object instance) {
325     this.declaredClass = instance.getClass();
326     this.instance = instance;
327   }
328 
329   /**
330    * @see java.lang.Object#toString()
331    */
332   @Override
333   public String toString() {
334     return "OW[class=" + declaredClass + ",value=" + instance + "]";
335   }
336 
337 
338   public void readFields(DataInput in) throws IOException {
339     readObject(in, this, this.conf);
340   }
341 
342   public void write(DataOutput out) throws IOException {
343     writeObject(out, instance, declaredClass, conf);
344   }
345 
346   public long getWritableSize() {
347     return getWritableSize(instance, declaredClass, conf);
348   }
349 
350   private static class NullInstance extends Configured implements Writable {
351     Class<?> declaredClass;
352     /** default constructor for writable */
353     @SuppressWarnings("unused")
354     public NullInstance() { super(null); }
355 
356     /**
357      * @param declaredClass
358      * @param conf
359      */
360     public NullInstance(Class<?> declaredClass, Configuration conf) {
361       super(conf);
362       this.declaredClass = declaredClass;
363     }
364 
365     public void readFields(DataInput in) throws IOException {
366       this.declaredClass = CODE_TO_CLASS.get(WritableUtils.readVInt(in));
367     }
368 
369     public void write(DataOutput out) throws IOException {
370       writeClassCode(out, this.declaredClass);
371     }
372   }
373 
374   static Integer getClassCode(final Class<?> c)
375   throws IOException {
376     Integer code = CLASS_TO_CODE.get(c);
377     if (code == null ) {
378       if (List.class.isAssignableFrom(c)) {
379         code = CLASS_TO_CODE.get(List.class);
380       } else if (Writable.class.isAssignableFrom(c)) {
381         code = CLASS_TO_CODE.get(Writable.class);
382       } else if (c.isArray()) {
383         code = CLASS_TO_CODE.get(Array.class);
384       } else if (Message.class.isAssignableFrom(c)) {
385         code = CLASS_TO_CODE.get(Message.class);
386       } else if (Serializable.class.isAssignableFrom(c)){
387         code = CLASS_TO_CODE.get(Serializable.class);
388       } else if (Scan.class.isAssignableFrom(c)) {
389         code = CLASS_TO_CODE.get(Scan.class);
390       }
391     }
392     return code;
393   }
394 
395   /**
396    * @return the next object code in the list.  Used in testing to verify that additional fields are not added 
397    */
398   static int getNextClassCode(){
399     return NEXT_CLASS_CODE;
400   }
401 
402   /**
403    * Write out the code for passed Class.
404    * @param out
405    * @param c
406    * @throws IOException
407    */
408   static void writeClassCode(final DataOutput out, final Class<?> c)
409       throws IOException {
410     Integer code = getClassCode(c);
411 
412     if (code == null) {
413       LOG.error("Unsupported type " + c);
414       StackTraceElement[] els = new Exception().getStackTrace();
415       for(StackTraceElement elem : els) {
416         LOG.error(elem.getMethodName());
417       }
418       throw new UnsupportedOperationException("No code for unexpected " + c);
419     }
420     WritableUtils.writeVInt(out, code);
421   }
422 
423   static long getWritableSize(Object instance, Class declaredClass,
424                                      Configuration conf) {
425     return 0L; // no hint is the default.
426   }
427   /**
428    * Write a {@link Writable}, {@link String}, primitive type, or an array of
429    * the preceding.
430    * @param out
431    * @param instance
432    * @param declaredClass
433    * @param conf
434    * @throws IOException
435    */
436   @SuppressWarnings("unchecked")
437   static void writeObject(DataOutput out, Object instance,
438                                  Class declaredClass,
439                                  Configuration conf)
440   throws IOException {
441 
442     Object instanceObj = instance;
443     Class declClass = declaredClass;
444 
445     if (instanceObj == null) {                       // null
446       instanceObj = new NullInstance(declClass, conf);
447       declClass = Writable.class;
448     }
449     writeClassCode(out, declClass);
450     if (declClass.isArray()) {                // array
451       // If bytearray, just dump it out -- avoid the recursion and
452       // byte-at-a-time we were previously doing.
453       if (declClass.equals(byte [].class)) {
454         Bytes.writeByteArray(out, (byte [])instanceObj);
455       } else {
456         //if it is a Generic array, write the element's type
457         if (getClassCode(declaredClass) == GENERIC_ARRAY_CODE) {
458           Class<?> componentType = declaredClass.getComponentType();
459           writeClass(out, componentType);
460         }
461 
462         int length = Array.getLength(instanceObj);
463         out.writeInt(length);
464         for (int i = 0; i < length; i++) {
465           Object item = Array.get(instanceObj, i);
466           writeObject(out, item,
467                     item.getClass(), conf);
468         }
469       }
470     } else if (List.class.isAssignableFrom(declClass)) {
471       List list = (List)instanceObj;
472       int length = list.size();
473       out.writeInt(length);
474       for (int i = 0; i < length; i++) {
475         Object elem = list.get(i);
476         writeObject(out, elem,
477                   elem == null ? Writable.class : elem.getClass(), conf);
478       }
479     } else if (declClass == String.class) {   // String
480       Text.writeString(out, (String)instanceObj);
481     } else if (declClass.isPrimitive()) {     // primitive type
482       if (declClass == Boolean.TYPE) {        // boolean
483         out.writeBoolean(((Boolean)instanceObj).booleanValue());
484       } else if (declClass == Character.TYPE) { // char
485         out.writeChar(((Character)instanceObj).charValue());
486       } else if (declClass == Byte.TYPE) {    // byte
487         out.writeByte(((Byte)instanceObj).byteValue());
488       } else if (declClass == Short.TYPE) {   // short
489         out.writeShort(((Short)instanceObj).shortValue());
490       } else if (declClass == Integer.TYPE) { // int
491         out.writeInt(((Integer)instanceObj).intValue());
492       } else if (declClass == Long.TYPE) {    // long
493         out.writeLong(((Long)instanceObj).longValue());
494       } else if (declClass == Float.TYPE) {   // float
495         out.writeFloat(((Float)instanceObj).floatValue());
496       } else if (declClass == Double.TYPE) {  // double
497         out.writeDouble(((Double)instanceObj).doubleValue());
498       } else if (declClass == Void.TYPE) {    // void
499       } else {
500         throw new IllegalArgumentException("Not a primitive: "+declClass);
501       }
502     } else if (declClass.isEnum()) {         // enum
503       Text.writeString(out, ((Enum)instanceObj).name());
504     } else if (Message.class.isAssignableFrom(declaredClass)) {
505       Text.writeString(out, instanceObj.getClass().getName());
506       ((Message)instance).writeDelimitedTo(
507           DataOutputOutputStream.constructOutputStream(out));
508     } else if (Writable.class.isAssignableFrom(declClass)) { // Writable
509       Class <?> c = instanceObj.getClass();
510       Integer code = CLASS_TO_CODE.get(c);
511       if (code == null) {
512         out.writeByte(NOT_ENCODED);
513         Text.writeString(out, c.getName());
514       } else {
515         writeClassCode(out, c);
516       }
517       ((Writable)instanceObj).write(out);
518     } else if (Serializable.class.isAssignableFrom(declClass)) {
519       Class <?> c = instanceObj.getClass();
520       Integer code = CLASS_TO_CODE.get(c);
521       if (code == null) {
522         out.writeByte(NOT_ENCODED);
523         Text.writeString(out, c.getName());
524       } else {
525         writeClassCode(out, c);
526       }
527       ByteArrayOutputStream bos = null;
528       ObjectOutputStream oos = null;
529       try{
530         bos = new ByteArrayOutputStream();
531         oos = new ObjectOutputStream(bos);
532         oos.writeObject(instanceObj);
533         byte[] value = bos.toByteArray();
534         out.writeInt(value.length);
535         out.write(value);
536       } finally {
537         if(bos!=null) bos.close();
538         if(oos!=null) oos.close();
539       }
540     } else if (Scan.class.isAssignableFrom(declClass)) {
541       Scan scan = (Scan)instanceObj;
542       byte [] scanBytes = ProtobufUtil.toScan(scan).toByteArray();
543       out.writeInt(scanBytes.length);
544       out.write(scanBytes);
545     } else if (Entry.class.isAssignableFrom(declClass)) {
546       // Entry is no longer Writable, maintain compatible serialization.
547       // Writables write their exact runtime class
548       Class <?> c = instanceObj.getClass();
549       Integer code = CLASS_TO_CODE.get(c);
550       if (code == null) {
551         out.writeByte(NOT_ENCODED);
552         Text.writeString(out, c.getName());
553       } else {
554         writeClassCode(out, c);
555       }
556       final Entry entry = (Entry)instanceObj;
557       // We only support legacy HLogKey
558       WALKey key = entry.getKey();
559       if (!(key instanceof HLogKey)) {
560         throw new IOException("Can't write Entry '" + instanceObj + "' due to key class '" +
561             key.getClass() + "'");
562       }
563       ((HLogKey)key).write(out);
564       entry.getEdit().write(out);
565     } else {
566       throw new IOException("Can't write: "+instanceObj+" as "+declClass);
567     }
568   }
569 
570   /** Writes the encoded class code as defined in CLASS_TO_CODE, or
571    * the whole class name if not defined in the mapping.
572    */
573   static void writeClass(DataOutput out, Class<?> c) throws IOException {
574     Integer code = CLASS_TO_CODE.get(c);
575     if (code == null) {
576       WritableUtils.writeVInt(out, NOT_ENCODED);
577       Text.writeString(out, c.getName());
578     } else {
579       WritableUtils.writeVInt(out, code);
580     }
581   }
582 
583   /** Reads and returns the class as written by {@link #writeClass(DataOutput, Class)} */
584   static Class<?> readClass(Configuration conf, DataInput in) throws IOException {
585     Class<?> instanceClass = null;
586     int b = (byte)WritableUtils.readVInt(in);
587     if (b == NOT_ENCODED) {
588       String className = Text.readString(in);
589       try {
590         instanceClass = getClassByName(conf, className);
591       } catch (ClassNotFoundException e) {
592         LOG.error("Can't find class " + className, e);
593         throw new IOException("Can't find class " + className, e);
594       }
595     } else {
596       instanceClass = CODE_TO_CLASS.get(b);
597     }
598     return instanceClass;
599   }
600 
601   /**
602    * Read a {@link Writable}, {@link String}, primitive type, or an array of
603    * the preceding.
604    * @param in
605    * @param conf
606    * @return the object
607    * @throws IOException
608    */
609   static Object readObject(DataInput in, Configuration conf)
610     throws IOException {
611     return readObject(in, null, conf);
612   }
613 
614   /**
615    * Read a {@link Writable}, {@link String}, primitive type, or an array of
616    * the preceding.
617    * @param in
618    * @param objectWritable
619    * @param conf
620    * @return the object
621    * @throws IOException
622    */
623   @SuppressWarnings("unchecked")
624   static Object readObject(DataInput in,
625       HbaseObjectWritableFor96Migration objectWritable, Configuration conf)
626   throws IOException {
627     Class<?> declaredClass = CODE_TO_CLASS.get(WritableUtils.readVInt(in));
628     Object instance;
629     if (declaredClass.isPrimitive()) {            // primitive types
630       if (declaredClass == Boolean.TYPE) {             // boolean
631         instance = Boolean.valueOf(in.readBoolean());
632       } else if (declaredClass == Character.TYPE) {    // char
633         instance = Character.valueOf(in.readChar());
634       } else if (declaredClass == Byte.TYPE) {         // byte
635         instance = Byte.valueOf(in.readByte());
636       } else if (declaredClass == Short.TYPE) {        // short
637         instance = Short.valueOf(in.readShort());
638       } else if (declaredClass == Integer.TYPE) {      // int
639         instance = Integer.valueOf(in.readInt());
640       } else if (declaredClass == Long.TYPE) {         // long
641         instance = Long.valueOf(in.readLong());
642       } else if (declaredClass == Float.TYPE) {        // float
643         instance = Float.valueOf(in.readFloat());
644       } else if (declaredClass == Double.TYPE) {       // double
645         instance = Double.valueOf(in.readDouble());
646       } else if (declaredClass == Void.TYPE) {         // void
647         instance = null;
648       } else {
649         throw new IllegalArgumentException("Not a primitive: "+declaredClass);
650       }
651     } else if (declaredClass.isArray()) {              // array
652       if (declaredClass.equals(byte [].class)) {
653         instance = Bytes.readByteArray(in);
654       } else {
655         int length = in.readInt();
656         instance = Array.newInstance(declaredClass.getComponentType(), length);
657         for (int i = 0; i < length; i++) {
658           Array.set(instance, i, readObject(in, conf));
659         }
660       }
661     } else if (declaredClass.equals(Array.class)) { //an array not declared in CLASS_TO_CODE
662       Class<?> componentType = readClass(conf, in);
663       int length = in.readInt();
664       instance = Array.newInstance(componentType, length);
665       for (int i = 0; i < length; i++) {
666         Array.set(instance, i, readObject(in, conf));
667       }
668     } else if (List.class.isAssignableFrom(declaredClass)) {            // List
669       int length = in.readInt();
670       instance = new ArrayList(length);
671       for (int i = 0; i < length; i++) {
672         ((ArrayList)instance).add(readObject(in, conf));
673       }
674     } else if (declaredClass == String.class) {        // String
675       instance = Text.readString(in);
676     } else if (declaredClass.isEnum()) {         // enum
677       instance = Enum.valueOf((Class<? extends Enum>) declaredClass,
678         Text.readString(in));
679     } else if (declaredClass == Message.class) {
680       String className = Text.readString(in);
681       try {
682         declaredClass = getClassByName(conf, className);
683         instance = tryInstantiateProtobuf(declaredClass, in);
684       } catch (ClassNotFoundException e) {
685         LOG.error("Can't find class " + className, e);
686         throw new IOException("Can't find class " + className, e);
687       }
688     } else if (Scan.class.isAssignableFrom(declaredClass)) {
689       int length = in.readInt();
690       byte [] scanBytes = new byte[length];
691       in.readFully(scanBytes);
692       ClientProtos.Scan.Builder scanProto = ClientProtos.Scan.newBuilder();
693       instance = ProtobufUtil.toScan(scanProto.mergeFrom(scanBytes).build());
694     } else {                                      // Writable or Serializable
695       Class instanceClass = null;
696       int b = (byte)WritableUtils.readVInt(in);
697       if (b == NOT_ENCODED) {
698         String className = Text.readString(in);
699         if ("org.apache.hadoop.hbase.regionserver.wal.HLog$Entry".equals(className)) {
700           className = Entry.class.getName();
701         }
702         try {
703           instanceClass = getClassByName(conf, className);
704         } catch (ClassNotFoundException e) {
705           LOG.error("Can't find class " + className, e);
706           throw new IOException("Can't find class " + className, e);
707         }
708       } else {
709         instanceClass = CODE_TO_CLASS.get(b);
710       }
711       if(Writable.class.isAssignableFrom(instanceClass)){
712         Writable writable = WritableFactories.newInstance(instanceClass, conf);
713         try {
714           writable.readFields(in);
715         } catch (Exception e) {
716           LOG.error("Error in readFields", e);
717           throw new IOException("Error in readFields" , e);
718         }
719         instance = writable;
720         if (instanceClass == NullInstance.class) {  // null
721           declaredClass = ((NullInstance)instance).declaredClass;
722           instance = null;
723         }
724       } else if (Entry.class.isAssignableFrom(instanceClass)) {
725         // Entry stopped being Writable; maintain serialization support.
726         final HLogKey key = new HLogKey();
727         final WALEdit edit = new WALEdit();
728         key.readFields(in);
729         edit.readFields(in);
730         instance = new Entry(key, edit);
731       } else {
732         int length = in.readInt();
733         byte[] objectBytes = new byte[length];
734         in.readFully(objectBytes);
735         ByteArrayInputStream bis = null;
736         ObjectInputStream ois = null;
737         try {
738           bis = new ByteArrayInputStream(objectBytes);
739           ois = new ObjectInputStream(bis);
740           instance = ois.readObject();
741         } catch (ClassNotFoundException e) {
742           LOG.error("Class not found when attempting to deserialize object", e);
743           throw new IOException("Class not found when attempting to " +
744               "deserialize object", e);
745         } finally {
746           if(bis!=null) bis.close();
747           if(ois!=null) ois.close();
748         }
749       }
750     }
751     if (objectWritable != null) {                 // store values
752       objectWritable.declaredClass = declaredClass;
753       objectWritable.instance = instance;
754     }
755     return instance;
756   }
757 
758   /**
759    * Try to instantiate a protocol buffer of the given message class
760    * from the given input stream.
761    *
762    * @param protoClass the class of the generated protocol buffer
763    * @param dataIn the input stream to read from
764    * @return the instantiated Message instance
765    * @throws IOException if an IO problem occurs
766    */
767   static Message tryInstantiateProtobuf(
768       Class<?> protoClass,
769       DataInput dataIn) throws IOException {
770 
771     try {
772       if (dataIn instanceof InputStream) {
773         // We can use the built-in parseDelimitedFrom and not have to re-copy
774         // the data
775         Method parseMethod = getStaticProtobufMethod(protoClass,
776             "parseDelimitedFrom", InputStream.class);
777         return (Message)parseMethod.invoke(null, (InputStream)dataIn);
778       } else {
779         // Have to read it into a buffer first, since protobuf doesn't deal
780         // with the DataInput interface directly.
781 
782         // Read the size delimiter that writeDelimitedTo writes
783         int size = ProtoUtil.readRawVarint32(dataIn);
784         if (size < 0) {
785           throw new IOException("Invalid size: " + size);
786         }
787 
788         byte[] data = new byte[size];
789         dataIn.readFully(data);
790         Method parseMethod = getStaticProtobufMethod(protoClass,
791             "parseFrom", byte[].class);
792         return (Message)parseMethod.invoke(null, data);
793       }
794     } catch (InvocationTargetException e) {
795 
796       if (e.getCause() instanceof IOException) {
797         throw (IOException)e.getCause();
798       } else {
799         throw new IOException(e.getCause());
800       }
801     } catch (IllegalAccessException iae) {
802       throw new AssertionError("Could not access parse method in " +
803           protoClass);
804     }
805   }
806 
807   static Method getStaticProtobufMethod(Class<?> declaredClass, String method,
808       Class<?> ... args) {
809 
810     try {
811       return declaredClass.getMethod(method, args);
812     } catch (Exception e) {
813       // This is a bug in Hadoop - protobufs should all have this static method
814       throw new AssertionError("Protocol buffer class " + declaredClass +
815           " does not have an accessible parseFrom(InputStream) method!");
816     }
817   }
818 
819   @SuppressWarnings("unchecked")
820   private static Class getClassByName(Configuration conf, String className)
821   throws ClassNotFoundException {
822     if(conf != null) {
823       return conf.getClassByName(className);
824     }
825     ClassLoader cl = Thread.currentThread().getContextClassLoader();
826     if(cl == null) {
827       cl = HbaseObjectWritableFor96Migration.class.getClassLoader();
828     }
829     return Class.forName(className, true, cl);
830   }
831 
832   private static void addToMap(final Class<?> clazz, final int code) {
833     CLASS_TO_CODE.put(clazz, code);
834     CODE_TO_CLASS.put(code, clazz);
835   }
836 
837   public void setConf(Configuration conf) {
838     this.conf = conf;
839   }
840 
841   public Configuration getConf() {
842     return this.conf;
843   }
844 }