001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.rest;
021
022import java.io.IOException;
023import java.util.Iterator;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.TableNotEnabledException;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.UnknownScannerException;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.client.ResultScanner;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.filter.Filter;
035import org.apache.hadoop.hbase.rest.model.ScannerModel;
036import org.apache.hadoop.hbase.security.visibility.Authorizations;
037import org.apache.hadoop.util.StringUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@InterfaceAudience.Private
043public class ScannerResultGenerator extends ResultGenerator {
044
045  private static final Logger LOG =
046    LoggerFactory.getLogger(ScannerResultGenerator.class);
047
048  public static Filter buildFilterFromModel(final ScannerModel model) 
049      throws Exception {
050    String filter = model.getFilter();
051    if (filter == null || filter.length() == 0) {
052      return null;
053    }
054    return buildFilter(filter);
055  }
056
057  private String id;
058  private Iterator<Cell> rowI;
059  private Cell cache;
060  private ResultScanner scanner;
061  private Result cached;
062
063  public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
064      final Filter filter, final boolean cacheBlocks)
065      throws IllegalArgumentException, IOException {
066    this(tableName, rowspec, filter, -1, cacheBlocks);
067  }
068
069  public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
070    final Filter filter, final int caching, final boolean cacheBlocks)
071    throws IllegalArgumentException, IOException {
072    this(tableName, rowspec, filter, caching, cacheBlocks, -1);
073  }
074
075  public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
076    final Filter filter, final int caching ,final boolean cacheBlocks, int limit) throws IOException {
077    Table table = RESTServlet.getInstance().getTable(tableName);
078    try {
079      Scan scan;
080      if (rowspec.hasEndRow()) {
081        scan = new Scan().withStartRow(rowspec.getStartRow()).withStopRow(rowspec.getEndRow());
082      } else {
083        scan = new Scan().withStartRow(rowspec.getStartRow());
084      }
085      if (rowspec.hasColumns()) {
086        byte[][] columns = rowspec.getColumns();
087        for (byte[] column: columns) {
088          byte[][] split = CellUtil.parseColumn(column);
089          if (split.length == 1) {
090            scan.addFamily(split[0]);
091          } else if (split.length == 2) {
092            scan.addColumn(split[0], split[1]);
093          } else {
094            throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
095          }
096        }
097      }
098      scan.setTimeRange(rowspec.getStartTime(), rowspec.getEndTime());
099      scan.readVersions(rowspec.getMaxVersions());
100      if (filter != null) {
101        scan.setFilter(filter);
102      }
103      if (caching > 0 ) {
104        scan.setCaching(caching);
105      }
106      if (limit > 0) {
107        scan.setLimit(limit);
108      }
109      scan.setCacheBlocks(cacheBlocks);
110      if (rowspec.hasLabels()) {
111        scan.setAuthorizations(new Authorizations(rowspec.getLabels()));
112      }
113      scanner = table.getScanner(scan);
114      cached = null;
115      id = Long.toString(System.currentTimeMillis()) +
116             Integer.toHexString(scanner.hashCode());
117    } finally {
118      table.close();
119    }
120  }
121
122  public String getID() {
123    return id;
124  }
125
126  @Override
127  public void close() {
128    if (scanner != null) {
129      scanner.close();
130      scanner = null;
131    }
132  }
133
134  @Override
135  public boolean hasNext() {
136    if (cache != null) {
137      return true;
138    }
139    if (rowI != null && rowI.hasNext()) {
140      return true;
141    }
142    if (cached != null) {
143      return true;
144    }
145    try {
146      Result result = scanner.next();
147      if (result != null && !result.isEmpty()) {
148        cached = result;
149      }
150    } catch (UnknownScannerException e) {
151      throw new IllegalArgumentException(e);
152    } catch (IOException e) {
153      LOG.error(StringUtils.stringifyException(e));
154    }
155    return cached != null;
156  }
157
158  @Override
159  public Cell next() {
160    if (cache != null) {
161      Cell kv = cache;
162      cache = null;
163      return kv;
164    }
165    boolean loop;
166    do {
167      loop = false;
168      if (rowI != null) {
169        if (rowI.hasNext()) {
170          return rowI.next();
171        } else {
172          rowI = null;
173        }
174      }
175      if (cached != null) {
176        rowI = cached.listCells().iterator();
177        loop = true;
178        cached = null;
179      } else {
180        Result result = null;
181        try {
182          result = scanner.next();
183        } catch (UnknownScannerException e) {
184          throw new IllegalArgumentException(e);
185        } catch (TableNotEnabledException tnee) {
186          throw new IllegalStateException(tnee);
187        } catch (TableNotFoundException tnfe) {
188          throw new IllegalArgumentException(tnfe);
189        } catch (IOException e) {
190          LOG.error(StringUtils.stringifyException(e));
191        }
192        if (result != null && !result.isEmpty()) {
193          rowI = result.listCells().iterator();
194          loop = true;
195        }
196      }
197    } while (loop);
198    return null;
199  }
200
201  @Override
202  public void putBack(Cell kv) {
203    this.cache = kv;
204  }
205
206  @Override
207  public void remove() {
208    throw new UnsupportedOperationException("remove not supported");
209  }
210}