1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19 package org.apache.hadoop.hbase.rest;
20
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.Iterator;
24 import java.util.List;
25
26 import javax.ws.rs.DefaultValue;
27 import javax.ws.rs.GET;
28 import javax.ws.rs.HeaderParam;
29 import javax.ws.rs.PathParam;
30 import javax.ws.rs.Produces;
31 import javax.ws.rs.QueryParam;
32 import javax.ws.rs.core.Context;
33 import javax.ws.rs.core.Response;
34 import javax.ws.rs.core.Response.ResponseBuilder;
35 import javax.ws.rs.core.UriInfo;
36 import javax.xml.bind.annotation.XmlAccessType;
37 import javax.xml.bind.annotation.XmlAccessorType;
38 import javax.xml.bind.annotation.XmlElement;
39 import javax.xml.bind.annotation.XmlRootElement;
40
41 import org.apache.commons.logging.Log;
42 import org.apache.commons.logging.LogFactory;
43 import org.apache.hadoop.hbase.classification.InterfaceAudience;
44 import org.apache.hadoop.hbase.Cell;
45 import org.apache.hadoop.hbase.CellUtil;
46 import org.apache.hadoop.hbase.client.Result;
47 import org.apache.hadoop.hbase.client.ResultScanner;
48 import org.apache.hadoop.hbase.rest.model.CellModel;
49 import org.apache.hadoop.hbase.rest.model.RowModel;
50 import org.codehaus.jackson.annotate.JsonIgnore;
51 import org.codehaus.jackson.annotate.JsonProperty;
52
53 @InterfaceAudience.Private
54 public class TableScanResource extends ResourceBase {
55
56 private static final Log LOG = LogFactory.getLog(TableScanResource.class);
57 TableResource tableResource;
58 ResultScanner results;
59 int userRequestedLimit;
60
61 public TableScanResource(ResultScanner scanner, int userRequestedLimit) throws IOException {
62 super();
63 this.results = scanner;
64 this.userRequestedLimit = userRequestedLimit;
65 }
66
67 @GET
68 @Produces({ Constants.MIMETYPE_XML, Constants.MIMETYPE_JSON })
69 public CellSetModelStream get(final @Context UriInfo uriInfo) {
70 servlet.getMetrics().incrementRequests(1);
71 final int rowsToSend = userRequestedLimit;
72 servlet.getMetrics().incrementSucessfulScanRequests(1);
73 final Iterator<Result> itr = results.iterator();
74 return new CellSetModelStream(new ArrayList<RowModel>() {
75 public Iterator<RowModel> iterator() {
76 return new Iterator<RowModel>() {
77 int count = rowsToSend;
78
79 @Override
80 public boolean hasNext() {
81 if (count > 0) {
82 return itr.hasNext();
83 } else {
84 return false;
85 }
86 }
87
88 @Override
89 public void remove() {
90 throw new UnsupportedOperationException(
91 "Remove method cannot be used in CellSetModelStream");
92 }
93
94 @Override
95 public RowModel next() {
96 Result rs = itr.next();
97 if ((rs == null) || (count <= 0)) {
98 return null;
99 }
100 byte[] rowKey = rs.getRow();
101 RowModel rModel = new RowModel(rowKey);
102 List<Cell> kvs = rs.listCells();
103 for (Cell kv : kvs) {
104 rModel.addCell(new CellModel(CellUtil.cloneFamily(kv), CellUtil.cloneQualifier(kv),
105 kv.getTimestamp(), CellUtil.cloneValue(kv)));
106 }
107 count--;
108 return rModel;
109 }
110 };
111 }
112 });
113 }
114
115 @GET
116 @Produces({ Constants.MIMETYPE_PROTOBUF, Constants.MIMETYPE_PROTOBUF_IETF })
117 public Response getProtobuf(
118 final @Context UriInfo uriInfo,
119 final @PathParam("scanspec") String scanSpec,
120 final @HeaderParam("Accept") String contentType,
121 @DefaultValue(Integer.MAX_VALUE + "") @QueryParam(Constants.SCAN_LIMIT) int userRequestedLimit,
122 @DefaultValue("") @QueryParam(Constants.SCAN_START_ROW) String startRow,
123 @DefaultValue("") @QueryParam(Constants.SCAN_END_ROW) String endRow,
124 @DefaultValue("column") @QueryParam(Constants.SCAN_COLUMN) List<String> column,
125 @DefaultValue("1") @QueryParam(Constants.SCAN_MAX_VERSIONS) int maxVersions,
126 @DefaultValue("-1") @QueryParam(Constants.SCAN_BATCH_SIZE) int batchSize,
127 @DefaultValue("0") @QueryParam(Constants.SCAN_START_TIME) long startTime,
128 @DefaultValue(Long.MAX_VALUE + "") @QueryParam(Constants.SCAN_END_TIME) long endTime,
129 @DefaultValue("true") @QueryParam(Constants.SCAN_BATCH_SIZE) boolean cacheBlocks) {
130 servlet.getMetrics().incrementRequests(1);
131 try {
132 int fetchSize = this.servlet.getConfiguration().getInt(Constants.SCAN_FETCH_SIZE, 10);
133 ProtobufStreamingUtil stream = new ProtobufStreamingUtil(this.results, contentType,
134 userRequestedLimit, fetchSize);
135 servlet.getMetrics().incrementSucessfulScanRequests(1);
136 ResponseBuilder response = Response.ok(stream);
137 response.header("content-type", contentType);
138 return response.build();
139 } catch (Exception exp) {
140 servlet.getMetrics().incrementFailedScanRequests(1);
141 processException(exp);
142 LOG.warn(exp);
143 return null;
144 }
145 }
146
147 @XmlRootElement(name = "CellSet")
148 @XmlAccessorType(XmlAccessType.FIELD)
149 public static class CellSetModelStream {
150
151 @XmlElement(name = "Row")
152 @JsonIgnore
153 private ArrayList<RowModel> Row;
154
155 public CellSetModelStream() {
156 }
157
158 public CellSetModelStream(final ArrayList<RowModel> rowList) {
159 this.Row = rowList;
160 }
161
162
163 @JsonProperty("Row")
164 public Iterator<RowModel> getIterator() {
165 return Row.iterator();
166 }
167 }
168 }