001/** 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayList; 022import java.util.Arrays; 023import java.util.List; 024 025import org.apache.yetus.audience.InterfaceAudience; 026import org.apache.hadoop.hbase.util.Bytes; 027 028/** 029 * A scan result cache that only returns complete result. 030 */ 031@InterfaceAudience.Private 032class CompleteScanResultCache implements ScanResultCache { 033 034 private int numberOfCompleteRows; 035 036 private final List<Result> partialResults = new ArrayList<>(); 037 038 private Result combine() throws IOException { 039 Result result = Result.createCompleteResult(partialResults); 040 partialResults.clear(); 041 return result; 042 } 043 044 private Result[] prependCombined(Result[] results, int length) throws IOException { 045 if (length == 0) { 046 return new Result[] { combine() }; 047 } 048 // the last part of a partial result may not be marked as partial so here we need to check if 049 // there is a row change. 050 int start; 051 if (Bytes.equals(partialResults.get(0).getRow(), results[0].getRow())) { 052 partialResults.add(results[0]); 053 start = 1; 054 length--; 055 } else { 056 start = 0; 057 } 058 Result[] prependResults = new Result[length + 1]; 059 prependResults[0] = combine(); 060 System.arraycopy(results, start, prependResults, 1, length); 061 return prependResults; 062 } 063 064 private Result[] updateNumberOfCompleteResultsAndReturn(Result... results) { 065 numberOfCompleteRows += results.length; 066 return results; 067 } 068 069 @Override 070 public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException { 071 // If no results were returned it indicates that either we have the all the partial results 072 // necessary to construct the complete result or the server had to send a heartbeat message 073 // to the client to keep the client-server connection alive 074 if (results.length == 0) { 075 // If this response was an empty heartbeat message, then we have not exhausted the region 076 // and thus there may be more partials server side that still need to be added to the partial 077 // list before we form the complete Result 078 if (!partialResults.isEmpty() && !isHeartbeatMessage) { 079 return updateNumberOfCompleteResultsAndReturn(combine()); 080 } 081 return EMPTY_RESULT_ARRAY; 082 } 083 // In every RPC response there should be at most a single partial result. Furthermore, if 084 // there is a partial result, it is guaranteed to be in the last position of the array. 085 Result last = results[results.length - 1]; 086 if (last.mayHaveMoreCellsInRow()) { 087 if (partialResults.isEmpty()) { 088 partialResults.add(last); 089 return updateNumberOfCompleteResultsAndReturn(Arrays.copyOf(results, results.length - 1)); 090 } 091 // We have only one result and it is partial 092 if (results.length == 1) { 093 // check if there is a row change 094 if (Bytes.equals(partialResults.get(0).getRow(), last.getRow())) { 095 partialResults.add(last); 096 return EMPTY_RESULT_ARRAY; 097 } 098 Result completeResult = combine(); 099 partialResults.add(last); 100 return updateNumberOfCompleteResultsAndReturn(completeResult); 101 } 102 // We have some complete results 103 Result[] resultsToReturn = prependCombined(results, results.length - 1); 104 partialResults.add(last); 105 return updateNumberOfCompleteResultsAndReturn(resultsToReturn); 106 } 107 if (!partialResults.isEmpty()) { 108 return updateNumberOfCompleteResultsAndReturn(prependCombined(results, results.length)); 109 } 110 return updateNumberOfCompleteResultsAndReturn(results); 111 } 112 113 @Override 114 public void clear() { 115 partialResults.clear(); 116 } 117 118 @Override 119 public int numberOfCompleteRows() { 120 return numberOfCompleteRows; 121 } 122}