001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; 021 022import java.io.IOException; 023import java.util.ArrayDeque; 024import java.util.ArrayList; 025import java.util.Deque; 026import java.util.List; 027import org.apache.hadoop.hbase.CellUtil; 028import org.apache.hadoop.hbase.ExtendedCell; 029import org.apache.hadoop.hbase.util.Bytes; 030import org.apache.yetus.audience.InterfaceAudience; 031 032/** 033 * A scan result cache for batched scan, i.e, 034 * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}. 035 * <p> 036 * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch 037 * doesn't mean setAllowPartialResult(true). 038 * @since 2.0.0 039 */ 040@InterfaceAudience.Private 041public class BatchScanResultCache implements ScanResultCache { 042 043 private final int batch; 044 045 // used to filter out the cells that already returned to user as we always start from the 046 // beginning of a row when retry. 047 private ExtendedCell lastCell; 048 049 private boolean lastResultPartial; 050 051 private final Deque<Result> partialResults = new ArrayDeque<>(); 052 053 private int numCellsOfPartialResults; 054 055 private int numberOfCompleteRows; 056 057 public BatchScanResultCache(int batch) { 058 this.batch = batch; 059 } 060 061 private void recordLastResult(Result result) { 062 lastCell = result.rawExtendedCells()[result.rawExtendedCells().length - 1]; 063 lastResultPartial = result.mayHaveMoreCellsInRow(); 064 } 065 066 private Result createCompletedResult() throws IOException { 067 numberOfCompleteRows++; 068 Result result = Result.createCompleteResult(partialResults); 069 partialResults.clear(); 070 numCellsOfPartialResults = 0; 071 return result; 072 } 073 074 // Add new result to the partial list and return a batched Result if caching size exceed batching 075 // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only 076 // one Result back at most(or null, which means we do not have enough cells). 077 private Result regroupResults(Result result) { 078 partialResults.addLast(result); 079 numCellsOfPartialResults += result.size(); 080 if (numCellsOfPartialResults < batch) { 081 return null; 082 } 083 ExtendedCell[] cells = new ExtendedCell[batch]; 084 int cellCount = 0; 085 boolean stale = false; 086 for (;;) { 087 Result r = partialResults.pollFirst(); 088 stale = stale || r.isStale(); 089 int newCellCount = cellCount + r.size(); 090 if (newCellCount > batch) { 091 // We have more cells than expected, so split the current result 092 int len = batch - cellCount; 093 System.arraycopy(r.rawCells(), 0, cells, cellCount, len); 094 ExtendedCell[] remainingCells = new ExtendedCell[r.size() - len]; 095 System.arraycopy(r.rawExtendedCells(), len, remainingCells, 0, r.size() - len); 096 partialResults.addFirst( 097 Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow())); 098 break; 099 } 100 System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size()); 101 if (newCellCount == batch) { 102 break; 103 } 104 cellCount = newCellCount; 105 } 106 numCellsOfPartialResults -= batch; 107 return Result.create(cells, null, stale, 108 result.mayHaveMoreCellsInRow() || !partialResults.isEmpty()); 109 } 110 111 @Override 112 public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { 113 if (results.length == 0) { 114 if (!isHeartbeatMessage) { 115 if (!partialResults.isEmpty()) { 116 return new Result[] { createCompletedResult() }; 117 } 118 if (lastResultPartial) { 119 // An empty non heartbeat result indicate that there must be a row change. So if the 120 // lastResultPartial is true then we need to increase numberOfCompleteRows. 121 numberOfCompleteRows++; 122 } 123 } 124 return EMPTY_RESULT_ARRAY; 125 } 126 List<Result> regroupedResults = new ArrayList<>(); 127 for (Result result : results) { 128 result = filterCells(result, lastCell); 129 if (result == null) { 130 continue; 131 } 132 if (!partialResults.isEmpty()) { 133 if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) { 134 // there is a row change 135 regroupedResults.add(createCompletedResult()); 136 } 137 } else if (lastResultPartial && !CellUtil.matchingRows(lastCell, result.getRow())) { 138 // As for batched scan we may return partial results to user if we reach the batch limit, so 139 // here we need to use lastCell to determine if there is row change and increase 140 // numberOfCompleteRows. 141 numberOfCompleteRows++; 142 } 143 // check if we have a row change 144 if ( 145 !partialResults.isEmpty() && !Bytes.equals(partialResults.peek().getRow(), result.getRow()) 146 ) { 147 regroupedResults.add(createCompletedResult()); 148 } 149 Result regroupedResult = regroupResults(result); 150 if (regroupedResult != null) { 151 if (!regroupedResult.mayHaveMoreCellsInRow()) { 152 numberOfCompleteRows++; 153 } 154 regroupedResults.add(regroupedResult); 155 // only update last cell when we actually return it to user. 156 recordLastResult(regroupedResult); 157 } 158 if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) { 159 // We are done for this row 160 regroupedResults.add(createCompletedResult()); 161 } 162 } 163 return regroupedResults.toArray(new Result[0]); 164 } 165 166 @Override 167 public void clear() { 168 partialResults.clear(); 169 numCellsOfPartialResults = 0; 170 } 171 172 @Override 173 public int numberOfCompleteRows() { 174 return numberOfCompleteRows; 175 } 176}