001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.codec; 019 020import java.io.EOFException; 021import java.io.IOException; 022import java.io.InputStream; 023import java.io.PushbackInputStream; 024 025import edu.umd.cs.findbugs.annotations.NonNull; 026 027import org.apache.hadoop.hbase.Cell; 028import org.apache.hadoop.hbase.HBaseInterfaceAudience; 029import org.apache.yetus.audience.InterfaceAudience; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * TODO javadoc 035 */ 036@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX}) 037public abstract class BaseDecoder implements Codec.Decoder { 038 protected static final Logger LOG = LoggerFactory.getLogger(BaseDecoder.class); 039 040 protected final InputStream in; 041 private Cell current = null; 042 043 protected static class PBIS extends PushbackInputStream { 044 public PBIS(InputStream in, int size) { 045 super(in, size); 046 } 047 048 public void resetBuf(int size) { 049 this.buf = new byte[size]; 050 this.pos = size; 051 } 052 } 053 054 public BaseDecoder(final InputStream in) { 055 this.in = new PBIS(in, 1); 056 } 057 058 @Override 059 public boolean advance() throws IOException { 060 int firstByte = in.read(); 061 if (firstByte == -1) { 062 return false; 063 } else { 064 ((PBIS)in).unread(firstByte); 065 } 066 067 try { 068 this.current = parseCell(); 069 } catch (IOException ioEx) { 070 ((PBIS)in).resetBuf(1); // reset the buffer in case the underlying stream is read from upper layers 071 rethrowEofException(ioEx); 072 } 073 return true; 074 } 075 076 private void rethrowEofException(IOException ioEx) throws IOException { 077 boolean isEof = false; 078 try { 079 isEof = this.in.available() == 0; 080 } catch (Throwable t) { 081 LOG.trace("Error getting available for error message - ignoring", t); 082 } 083 if (!isEof) throw ioEx; 084 if (LOG.isTraceEnabled()) { 085 LOG.trace("Partial cell read caused by EOF", ioEx); 086 } 087 EOFException eofEx = new EOFException("Partial cell read"); 088 eofEx.initCause(ioEx); 089 throw eofEx; 090 } 091 092 protected InputStream getInputStream() { 093 return in; 094 } 095 096 /** 097 * Extract a Cell. 098 * @return a parsed Cell or throws an Exception. EOFException or a generic IOException maybe 099 * thrown if EOF is reached prematurely. Does not return null. 100 * @throws IOException 101 */ 102 @NonNull 103 protected abstract Cell parseCell() throws IOException; 104 105 @Override 106 public Cell current() { 107 return this.current; 108 } 109}