001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells; 021 022import java.io.IOException; 023import java.util.ArrayDeque; 024import java.util.ArrayList; 025import java.util.Deque; 026import java.util.List; 027 028import org.apache.hadoop.hbase.Cell; 029import org.apache.hadoop.hbase.CellUtil; 030import org.apache.yetus.audience.InterfaceAudience; 031import org.apache.hadoop.hbase.util.Bytes; 032 033/** 034 * A scan result cache for batched scan, i.e, 035 * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}. 036 * <p> 037 * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch 038 * doesn't mean setAllowPartialResult(true). 039 * @since 2.0.0 040 */ 041@InterfaceAudience.Private 042public class BatchScanResultCache implements ScanResultCache { 043 044 private final int batch; 045 046 // used to filter out the cells that already returned to user as we always start from the 047 // beginning of a row when retry. 048 private Cell lastCell; 049 050 private boolean lastResultPartial; 051 052 private final Deque<Result> partialResults = new ArrayDeque<>(); 053 054 private int numCellsOfPartialResults; 055 056 private int numberOfCompleteRows; 057 058 public BatchScanResultCache(int batch) { 059 this.batch = batch; 060 } 061 062 private void recordLastResult(Result result) { 063 lastCell = result.rawCells()[result.rawCells().length - 1]; 064 lastResultPartial = result.mayHaveMoreCellsInRow(); 065 } 066 067 private Result createCompletedResult() throws IOException { 068 numberOfCompleteRows++; 069 Result result = Result.createCompleteResult(partialResults); 070 partialResults.clear(); 071 numCellsOfPartialResults = 0; 072 return result; 073 } 074 075 // Add new result to the partial list and return a batched Result if caching size exceed batching 076 // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only 077 // one Result back at most(or null, which means we do not have enough cells). 078 private Result regroupResults(Result result) { 079 partialResults.addLast(result); 080 numCellsOfPartialResults += result.size(); 081 if (numCellsOfPartialResults < batch) { 082 return null; 083 } 084 Cell[] cells = new Cell[batch]; 085 int cellCount = 0; 086 boolean stale = false; 087 for (;;) { 088 Result r = partialResults.pollFirst(); 089 stale = stale || r.isStale(); 090 int newCellCount = cellCount + r.size(); 091 if (newCellCount > batch) { 092 // We have more cells than expected, so split the current result 093 int len = batch - cellCount; 094 System.arraycopy(r.rawCells(), 0, cells, cellCount, len); 095 Cell[] remainingCells = new Cell[r.size() - len]; 096 System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len); 097 partialResults.addFirst( 098 Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow())); 099 break; 100 } 101 System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size()); 102 if (newCellCount == batch) { 103 break; 104 } 105 cellCount = newCellCount; 106 } 107 numCellsOfPartialResults -= batch; 108 return Result.create(cells, null, stale, 109 result.mayHaveMoreCellsInRow() || !partialResults.isEmpty()); 110 } 111 112 @Override 113 public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { 114 if (results.length == 0) { 115 if (!isHeartbeatMessage) { 116 if (!partialResults.isEmpty()) { 117 return new Result[] { createCompletedResult() }; 118 } 119 if (lastResultPartial) { 120 // An empty non heartbeat result indicate that there must be a row change. So if the 121 // lastResultPartial is true then we need to increase numberOfCompleteRows. 122 numberOfCompleteRows++; 123 } 124 } 125 return EMPTY_RESULT_ARRAY; 126 } 127 List<Result> regroupedResults = new ArrayList<>(); 128 for (Result result : results) { 129 result = filterCells(result, lastCell); 130 if (result == null) { 131 continue; 132 } 133 if (!partialResults.isEmpty()) { 134 if (!Bytes.equals(partialResults.peek().getRow(), result.getRow())) { 135 // there is a row change 136 regroupedResults.add(createCompletedResult()); 137 } 138 } else if (lastResultPartial && !CellUtil.matchingRows(lastCell, result.getRow())) { 139 // As for batched scan we may return partial results to user if we reach the batch limit, so 140 // here we need to use lastCell to determine if there is row change and increase 141 // numberOfCompleteRows. 142 numberOfCompleteRows++; 143 } 144 // check if we have a row change 145 if (!partialResults.isEmpty() && 146 !Bytes.equals(partialResults.peek().getRow(), result.getRow())) { 147 regroupedResults.add(createCompletedResult()); 148 } 149 Result regroupedResult = regroupResults(result); 150 if (regroupedResult != null) { 151 if (!regroupedResult.mayHaveMoreCellsInRow()) { 152 numberOfCompleteRows++; 153 } 154 regroupedResults.add(regroupedResult); 155 // only update last cell when we actually return it to user. 156 recordLastResult(regroupedResult); 157 } 158 if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) { 159 // We are done for this row 160 regroupedResults.add(createCompletedResult()); 161 } 162 } 163 return regroupedResults.toArray(new Result[0]); 164 } 165 166 @Override 167 public void clear() { 168 partialResults.clear(); 169 numCellsOfPartialResults = 0; 170 } 171 172 @Override 173 public int numberOfCompleteRows() { 174 return numberOfCompleteRows; 175 } 176}