1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.rest;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25
26 import javax.ws.rs.DefaultValue;
27 import javax.ws.rs.GET;
28 import javax.ws.rs.HeaderParam;
29 import javax.ws.rs.PathParam;
30 import javax.ws.rs.Produces;
31 import javax.ws.rs.QueryParam;
32 import javax.ws.rs.core.Context;
33 import javax.ws.rs.core.Response;
34 import javax.ws.rs.core.Response.ResponseBuilder;
35 import javax.ws.rs.core.UriInfo;
36 import javax.xml.bind.annotation.XmlAccessType;
37 import javax.xml.bind.annotation.XmlAccessorType;
38 import javax.xml.bind.annotation.XmlElement;
39 import javax.xml.bind.annotation.XmlRootElement;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.hbase.classification.InterfaceAudience;
44 import org.apache.hadoop.hbase.Cell;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.ResultScanner;
48 import org.apache.hadoop.hbase.rest.model.CellModel;
49 import org.apache.hadoop.hbase.rest.model.RowModel;
50 import org.codehaus.jackson.annotate.JsonIgnore;
51 import org.codehaus.jackson.annotate.JsonProperty;
52
53 @InterfaceAudience.Private
54 public class TableScanResource extends ResourceBase {
55
56 private static final Log LOG = LogFactory.getLog(TableScanResource.class);
57 TableResource tableResource;
58 ResultScanner results;
59 int userRequestedLimit;
60
61 public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException {
62 super();
63 this.results = scanner;
64 this.userRequestedLimit = userRequestedLimit;
65 }
66
67 @GET
68 @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON })
69 public CellSetModelStream get(final @Context UriInfo uriInfo) {
70 servlet.getMetrics().incrementRequests(1);
71 final int rowsToSend = userRequestedLimit;
72 servlet.getMetrics().incrementSucessfulScanRequests(1);
73 final Iterator<Result> itr = results.iterator();
74 return new CellSetModelStream(new ArrayList<RowModel>() {
75 public Iterator<RowModel> iterator() {
76 return new Iterator<RowModel>() {
77 int count = rowsToSend;
78
79 @Override
80 public boolean hasNext() {
81 if (count > 0) {
82 return itr.hasNext();
83 } else {
84 return false;
85 }
86 }
87
88 @Override
89 public void remove() {
90 throw new UnsupportedOperationException(
91 "Remove method cannot be used in CellSetModelStream");
92 }
93
94 @Override
95 public RowModel next() {
96 Result rs = itr.next();
97 if ((rs == null) || (count <= 0)) {
98 return null;
99 }
100 byte[] rowKey = rs.getRow();
101 RowModel rModel = new RowModel(rowKey);
102 List<Cell> kvs = rs.listCells();
103 for (Cell kv : kvs) {
104 rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
105 kv.getTimestamp(), CellUtil.cloneValue(kv)));
106 }
107 count--;
108 if (count == 0) {
109 results.close();
110 }
111 return rModel;
112 }
113 };
114 }
115 });
116 }
117
118 @GET
119 @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF })
120 public Response getProtobuf(
121 final @Context UriInfo uriInfo,
122 final @PathParam("scanspec") String scanSpec,
123 final @HeaderParam("Accept") String contentType,
124 @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
125 @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
126 @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
127 @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
128 @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
129 @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
130 @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
131 @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
132 @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
133 servlet.getMetrics().incrementRequests(1);
134 try {
135 int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
136 ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType,
137 userRequestedLimit, fetchSize);
138 servlet.getMetrics().incrementSucessfulScanRequests(1);
139 ResponseBuilder response = Response.ok(stream);
140 response.header("content-type", contentType);
141 return response.build();
142 } catch (Exception exp) {
143 servlet.getMetrics().incrementFailedScanRequests(1);
144 processException(exp);
145 LOG.warn(exp);
146 return null;
147 }
148 }
149
150 @XmlRootElement(name = "CellSet")
151 @XmlAccessorType(XmlAccessType.FIELD)
152 public static class CellSetModelStream {
153
154 @XmlElement(name = "Row")
155 @JsonIgnore
156 private ArrayList<RowModel> Row;
157
158 public CellSetModelStream() {
159 }
160
161 public CellSetModelStream(final ArrayList<RowModel> rowList) {
162 this.Row = rowList;
163 }
164
165
166 @JsonProperty("Row")
167 public Iterator<RowModel> getIterator() {
168 return Row.iterator();
169 }
170 }
171 }