001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.client; 019 020import java.io.IOException; 021import java.util.ArrayDeque; 022import java.util.Queue; 023import org.apache.hadoop.hbase.client.metrics.ScanMetrics; 024 025import org.apache.hbase.thirdparty.com.google.common.base.Throwables; 026 027/** 028 * A scan result consumer which buffers all the data in memory and you can call the {@link #take()} 029 * method below to get the result one by one. Should only be used by tests, do not write production 030 * code like this as the buffer is unlimited and may cause OOM. 031 */ 032class BufferingScanResultConsumer implements AdvancedScanResultConsumer { 033 034 private ScanMetrics scanMetrics; 035 036 private final Queue<Result> queue = new ArrayDeque<>(); 037 038 private boolean finished; 039 040 private Throwable error; 041 042 @Override 043 public void onScanMetricsCreated(ScanMetrics scanMetrics) { 044 this.scanMetrics = scanMetrics; 045 } 046 047 @Override 048 public synchronized void onNext(Result[] results, ScanController controller) { 049 for (Result result : results) { 050 queue.offer(result); 051 } 052 notifyAll(); 053 } 054 055 @Override 056 public synchronized void onError(Throwable error) { 057 finished = true; 058 this.error = error; 059 notifyAll(); 060 } 061 062 @Override 063 public synchronized void onComplete() { 064 finished = true; 065 notifyAll(); 066 } 067 068 public synchronized Result take() throws IOException, InterruptedException { 069 for (;;) { 070 if (!queue.isEmpty()) { 071 return queue.poll(); 072 } 073 if (finished) { 074 if (error != null) { 075 Throwables.propagateIfPossible(error, IOException.class); 076 throw new IOException(error); 077 } else { 078 return null; 079 } 080 } 081 wait(); 082 } 083 } 084 085 public ScanMetrics getScanMetrics() { 086 return scanMetrics; 087 } 088}