001/*
002 *
003 * Licensed to the Apache Software Foundation (ASF) under one
004 * or more contributor license agreements.  See the NOTICE file
005 * distributed with this work for additional information
006 * regarding copyright ownership.  The ASF licenses this file
007 * to you under the Apache License, Version 2.0 (the
008 * "License"); you may not use this file except in compliance
009 * with the License.  You may obtain a copy of the License at
010 *
011 *     http://www.apache.org/licenses/LICENSE-2.0
012 *
013 * Unless required by applicable law or agreed to in writing, software
014 * distributed under the License is distributed on an "AS IS" BASIS,
015 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
016 * See the License for the specific language governing permissions and
017 * limitations under the License.
018 */
019
020package org.apache.hadoop.hbase.rest;
021
022import java.io.IOException;
023import java.util.Iterator;
024
025import org.apache.hadoop.hbase.Cell;
026import org.apache.hadoop.hbase.CellUtil;
027import org.apache.hadoop.hbase.TableNotEnabledException;
028import org.apache.hadoop.hbase.TableNotFoundException;
029import org.apache.hadoop.hbase.UnknownScannerException;
030import org.apache.hadoop.hbase.client.Result;
031import org.apache.hadoop.hbase.client.ResultScanner;
032import org.apache.hadoop.hbase.client.Scan;
033import org.apache.hadoop.hbase.client.Table;
034import org.apache.hadoop.hbase.filter.Filter;
035import org.apache.hadoop.hbase.rest.model.ScannerModel;
036import org.apache.hadoop.hbase.security.visibility.Authorizations;
037import org.apache.hadoop.util.StringUtils;
038import org.apache.yetus.audience.InterfaceAudience;
039import org.slf4j.Logger;
040import org.slf4j.LoggerFactory;
041
042@InterfaceAudience.Private
043public class ScannerResultGenerator extends ResultGenerator {
044
045  private static final Logger LOG =
046    LoggerFactory.getLogger(ScannerResultGenerator.class);
047
048  public static Filter buildFilterFromModel(final ScannerModel model) 
049      throws Exception {
050    String filter = model.getFilter();
051    if (filter == null || filter.length() == 0) {
052      return null;
053    }
054    return buildFilter(filter);
055  }
056
057  private String id;
058  private Iterator<Cell> rowI;
059  private Cell cache;
060  private ResultScanner scanner;
061  private Result cached;
062
063  public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
064      final Filter filter, final boolean cacheBlocks)
065      throws IllegalArgumentException, IOException {
066    this(tableName, rowspec, filter, -1, cacheBlocks);
067  }
068
069  public ScannerResultGenerator(final String tableName, final RowSpec rowspec,
070      final Filter filter, final int caching, final boolean cacheBlocks)
071      throws IllegalArgumentException, IOException {
072    Table table = RESTServlet.getInstance().getTable(tableName);
073    try {
074      Scan scan;
075      if (rowspec.hasEndRow()) {
076        scan = new Scan(rowspec.getStartRow(), rowspec.getEndRow());
077      } else {
078        scan = new Scan(rowspec.getStartRow());
079      }
080      if (rowspec.hasColumns()) {
081        byte[][] columns = rowspec.getColumns();
082        for (byte[] column: columns) {
083          byte[][] split = CellUtil.parseColumn(column);
084          if (split.length == 1) {
085            scan.addFamily(split[0]);
086          } else if (split.length == 2) {
087            scan.addColumn(split[0], split[1]);
088          } else {
089            throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
090          }
091        }
092      }
093      scan.setTimeRange(rowspec.getStartTime(), rowspec.getEndTime());          
094      scan.setMaxVersions(rowspec.getMaxVersions());
095      if (filter != null) {
096        scan.setFilter(filter);
097      }
098      if (caching > 0 ) {
099        scan.setCaching(caching);
100      }
101      scan.setCacheBlocks(cacheBlocks);
102      if (rowspec.hasLabels()) {
103        scan.setAuthorizations(new Authorizations(rowspec.getLabels()));
104      }
105      scanner = table.getScanner(scan);
106      cached = null;
107      id = Long.toString(System.currentTimeMillis()) +
108             Integer.toHexString(scanner.hashCode());
109    } finally {
110      table.close();
111    }
112  }
113
114  public String getID() {
115    return id;
116  }
117
118  @Override
119  public void close() {
120    if (scanner != null) {
121      scanner.close();
122      scanner = null;
123    }
124  }
125
126  @Override
127  public boolean hasNext() {
128    if (cache != null) {
129      return true;
130    }
131    if (rowI != null && rowI.hasNext()) {
132      return true;
133    }
134    if (cached != null) {
135      return true;
136    }
137    try {
138      Result result = scanner.next();
139      if (result != null && !result.isEmpty()) {
140        cached = result;
141      }
142    } catch (UnknownScannerException e) {
143      throw new IllegalArgumentException(e);
144    } catch (IOException e) {
145      LOG.error(StringUtils.stringifyException(e));
146    }
147    return cached != null;
148  }
149
150  @Override
151  public Cell next() {
152    if (cache != null) {
153      Cell kv = cache;
154      cache = null;
155      return kv;
156    }
157    boolean loop;
158    do {
159      loop = false;
160      if (rowI != null) {
161        if (rowI.hasNext()) {
162          return rowI.next();
163        } else {
164          rowI = null;
165        }
166      }
167      if (cached != null) {
168        rowI = cached.listCells().iterator();
169        loop = true;
170        cached = null;
171      } else {
172        Result result = null;
173        try {
174          result = scanner.next();
175        } catch (UnknownScannerException e) {
176          throw new IllegalArgumentException(e);
177        } catch (TableNotEnabledException tnee) {
178          throw new IllegalStateException(tnee);
179        } catch (TableNotFoundException tnfe) {
180          throw new IllegalArgumentException(tnfe);
181        } catch (IOException e) {
182          LOG.error(StringUtils.stringifyException(e));
183        }
184        if (result != null && !result.isEmpty()) {
185          rowI = result.listCells().iterator();
186          loop = true;
187        }
188      }
189    } while (loop);
190    return null;
191  }
192
193  @Override
194  public void putBack(Cell kv) {
195    this.cache = kv;
196  }
197
198  @Override
199  public void remove() {
200    throw new UnsupportedOperationException("remove not supported");
201  }
202}