001/* 002 * Licensed to the Apache Software Foundation (ASF) under one or more contributor license 003 * agreements. See the NOTICE file distributed with this work for additional information regarding 004 * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the 005 * "License"); you may not use this file except in compliance with the License. You may obtain a 006 * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable 007 * law or agreed to in writing, software distributed under the License is distributed on an "AS IS" 008 * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License 009 * for the specific language governing permissions and limitations under the License. 010 */ 011package org.apache.hadoop.hbase.io.encoding; 012 013import java.io.DataOutputStream; 014import java.io.IOException; 015import org.apache.hadoop.hbase.Cell; 016import org.apache.hadoop.hbase.KeyValueUtil; 017import org.apache.hadoop.hbase.io.ByteArrayOutputStream; 018import org.apache.hadoop.hbase.util.Bytes; 019import org.apache.yetus.audience.InterfaceAudience; 020import org.slf4j.Logger; 021import org.slf4j.LoggerFactory; 022 023@InterfaceAudience.Private 024public class RowIndexEncoderV1 { 025 private static final Logger LOG = LoggerFactory.getLogger(RowIndexEncoderV1.class); 026 027 /** The Cell previously appended. */ 028 private Cell lastCell = null; 029 030 private DataOutputStream out; 031 private NoneEncoder encoder; 032 private int startOffset = -1; 033 private ByteArrayOutputStream rowsOffsetBAOS = new ByteArrayOutputStream(64 * 4); 034 private final HFileBlockEncodingContext context; 035 036 public RowIndexEncoderV1(DataOutputStream out, HFileBlockDefaultEncodingContext encodingCtx) { 037 this.out = out; 038 this.encoder = new NoneEncoder(out, encodingCtx); 039 this.context = encodingCtx; 040 } 041 042 public void write(Cell cell) throws IOException { 043 // checkRow uses comparator to check we are writing in order. 044 int extraBytesForRowIndex = 0; 045 046 if (!checkRow(cell)) { 047 if (startOffset < 0) { 048 startOffset = out.size(); 049 } 050 rowsOffsetBAOS.writeInt(out.size() - startOffset); 051 // added for the int written in the previous line 052 extraBytesForRowIndex = Bytes.SIZEOF_INT; 053 } 054 lastCell = cell; 055 int size = encoder.write(cell); 056 context.getEncodingState().postCellEncode(size, size + extraBytesForRowIndex); 057 } 058 059 protected boolean checkRow(final Cell cell) throws IOException { 060 boolean isDuplicateRow = false; 061 if (cell == null) { 062 throw new IOException("Key cannot be null or empty"); 063 } 064 if (lastCell != null) { 065 int keyComp = this.context.getHFileContext().getCellComparator().compareRows(lastCell, cell); 066 if (keyComp > 0) { 067 throw new IOException("Added a key not lexically larger than" 068 + " previous. Current cell = " + cell + ", lastCell = " + lastCell); 069 } else if (keyComp == 0) { 070 isDuplicateRow = true; 071 } 072 } 073 return isDuplicateRow; 074 } 075 076 public void flush() throws IOException { 077 int onDiskDataSize = 0; 078 if (startOffset >= 0) { 079 onDiskDataSize = out.size() - startOffset; 080 } 081 out.writeInt(rowsOffsetBAOS.size() / 4); 082 if (rowsOffsetBAOS.size() > 0) { 083 out.write(rowsOffsetBAOS.getBuffer(), 0, rowsOffsetBAOS.size()); 084 } 085 out.writeInt(onDiskDataSize); 086 if (LOG.isTraceEnabled()) { 087 LOG.trace("RowNumber: " + rowsOffsetBAOS.size() / 4 088 + ", onDiskDataSize: " + onDiskDataSize + ", totalOnDiskSize: " 089 + (out.size() - startOffset)); 090 } 091 } 092 093 void beforeShipped() { 094 if (this.lastCell != null) { 095 this.lastCell = KeyValueUtil.toNewKeyCell(this.lastCell); 096 } 097 } 098}