View Javadoc

1   /*
2    * Copyright 2010 The Apache Software Foundation
3    *
4    * Licensed to the Apache Software Foundation (ASF) under one
5    * or more contributor license agreements.  See the NOTICE file
6    * distributed with this work for additional information
7    * regarding copyright ownership.  The ASF licenses this file
8    * to you under the Apache License, Version 2.0 (the
9    * "License"); you may not use this file except in compliance
10   * with the License.  You may obtain a copy of the License at
11   *
12   *     http://www.apache.org/licenses/LICENSE-2.0
13   *
14   * Unless required by applicable law or agreed to in writing, software
15   * distributed under the License is distributed on an "AS IS" BASIS,
16   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17   * See the License for the specific language governing permissions and
18   * limitations under the License.
19   */
20  
21  package org.apache.hadoop.hbase.client;
22  
23  import org.apache.hadoop.hbase.HConstants;
24  import org.apache.hadoop.hbase.filter.Filter;
25  import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
26  import org.apache.hadoop.hbase.io.TimeRange;
27  import org.apache.hadoop.hbase.util.Bytes;
28  import org.apache.hadoop.hbase.util.Classes;
29  import org.apache.hadoop.io.Writable;
30  
31  import java.io.DataInput;
32  import java.io.DataOutput;
33  import java.io.IOException;
34  import java.util.ArrayList;
35  import java.util.HashMap;
36  import java.util.List;
37  import java.util.Map;
38  import java.util.NavigableSet;
39  import java.util.TreeMap;
40  import java.util.TreeSet;
41  
42  /**
43   * Used to perform Scan operations.
44   * <p>
45   * All operations are identical to {@link Get} with the exception of
46   * instantiation.  Rather than specifying a single row, an optional startRow
47   * and stopRow may be defined.  If rows are not specified, the Scanner will
48   * iterate over all rows.
49   * <p>
50   * To scan everything for each row, instantiate a Scan object.
51   * <p>
52   * To modify scanner caching for just this scan, use {@link #setCaching(int) setCaching}.
53   * If caching is NOT set, we will use the caching value of the hosting
54   * {@link HTable}.  See {@link HTable#setScannerCaching(int)}.
55   * <p>
56   * To further define the scope of what to get when scanning, perform additional
57   * methods as outlined below.
58   * <p>
59   * To get all columns from specific families, execute {@link #addFamily(byte[]) addFamily}
60   * for each family to retrieve.
61   * <p>
62   * To get specific columns, execute {@link #addColumn(byte[], byte[]) addColumn}
63   * for each column to retrieve.
64   * <p>
65   * To only retrieve columns within a specific range of version timestamps,
66   * execute {@link #setTimeRange(long, long) setTimeRange}.
67   * <p>
68   * To only retrieve columns with a specific timestamp, execute
69   * {@link #setTimeStamp(long) setTimestamp}.
70   * <p>
71   * To limit the number of versions of each column to be returned, execute
72   * {@link #setMaxVersions(int) setMaxVersions}.
73   * <p>
74   * To limit the maximum number of values returned for each call to next(),
75   * execute {@link #setBatch(int) setBatch}.
76   * <p>
77   * To add a filter, execute {@link #setFilter(org.apache.hadoop.hbase.filter.Filter) setFilter}.
78   * <p>
79   * Expert: To explicitly disable server-side block caching for this scan,
80   * execute {@link #setCacheBlocks(boolean)}.
81   */
82  public class Scan extends OperationWithAttributes implements Writable {
83    private static final String RAW_ATTR = "_raw_";
84    private static final String ONDEMAND_ATTR = "_ondemand_";
85    private static final String ISOLATION_LEVEL = "_isolationlevel_";
86  
87    /** Scan Hints */
88    private static final String SMALL_ATTR = "_small_";
89    /**
90     * EXPERT ONLY.
91     * An integer (not long) indicating to the scanner logic how many times we attempt to retrieve the
92     * next KV before we schedule a reseek.
93     * The right value depends on the size of the average KV. A reseek is more efficient when
94     * it can skip 5-10 KVs or 512B-1KB, or when the next KV is likely found in another HFile block.
95     * Setting this only has any effect when columns were added with
96     * {@link #addColumn(byte[], byte[])}
97     * <pre>{@code
98     * Scan s = new Scan(...);
99     * s.addColumn(...);
100    * s.setAttribute(Scan.HINT_LOOKAHEAD, Bytes.toBytes(2));
101    * }</pre>
102    * Default is 0 (always reseek).
103    */
104   public static final String HINT_LOOKAHEAD = "_look_ahead_";
105 
106   private static final byte SCAN_VERSION = (byte)2;
107   private byte [] startRow = HConstants.EMPTY_START_ROW;
108   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
109   private int maxVersions = 1;
110   private int batch = -1;
111   // If application wants to collect scan metrics, it needs to
112   // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
113   static public String SCAN_ATTRIBUTES_METRICS_ENABLE = "scan.attributes.metrics.enable";
114   static public String SCAN_ATTRIBUTES_METRICS_DATA = "scan.attributes.metrics.data";
115   
116   // If an application wants to use multiple scans over different tables each scan must
117   // define this attribute with the appropriate table name by calling
118   // scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes(tableName))
119   static public final String SCAN_ATTRIBUTES_TABLE_NAME = "scan.attributes.table.name";
120 
121   /*
122    * -1 means no caching
123    */
124   private int caching = -1;
125   private boolean cacheBlocks = true;
126   private Filter filter = null;
127   private TimeRange tr = new TimeRange();
128   private Map<byte [], NavigableSet<byte []>> familyMap =
129     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
130 
131   /**
132    * Create a Scan operation across all rows.
133    */
134   public Scan() {}
135 
136   public Scan(byte [] startRow, Filter filter) {
137     this(startRow);
138     this.filter = filter;
139   }
140 
141   /**
142    * Create a Scan operation starting at the specified row.
143    * <p>
144    * If the specified row does not exist, the Scanner will start from the
145    * next closest row after the specified row.
146    * @param startRow row to start scanner at or after
147    */
148   public Scan(byte [] startRow) {
149     this.startRow = startRow;
150   }
151 
152   /**
153    * Create a Scan operation for the range of rows specified.
154    * @param startRow row to start scanner at or after (inclusive)
155    * @param stopRow row to stop scanner before (exclusive)
156    */
157   public Scan(byte [] startRow, byte [] stopRow) {
158     this.startRow = startRow;
159     this.stopRow = stopRow;
160   }
161 
162   /**
163    * Creates a new instance of this class while copying all values.
164    *
165    * @param scan  The scan instance to copy from.
166    * @throws IOException When copying the values fails.
167    */
168   public Scan(Scan scan) throws IOException {
169     startRow = scan.getStartRow();
170     stopRow  = scan.getStopRow();
171     maxVersions = scan.getMaxVersions();
172     batch = scan.getBatch();
173     caching = scan.getCaching();
174     cacheBlocks = scan.getCacheBlocks();
175     filter = scan.getFilter(); // clone?
176     TimeRange ctr = scan.getTimeRange();
177     tr = new TimeRange(ctr.getMin(), ctr.getMax());
178     Map<byte[], NavigableSet<byte[]>> fams = scan.getFamilyMap();
179     for (Map.Entry<byte[],NavigableSet<byte[]>> entry : fams.entrySet()) {
180       byte [] fam = entry.getKey();
181       NavigableSet<byte[]> cols = entry.getValue();
182       if (cols != null && cols.size() > 0) {
183         for (byte[] col : cols) {
184           addColumn(fam, col);
185         }
186       } else {
187         addFamily(fam);
188       }
189     }
190     for (Map.Entry<String, byte[]> attr : scan.getAttributesMap().entrySet()) {
191       setAttribute(attr.getKey(), attr.getValue());
192     }
193   }
194 
195   /**
196    * Builds a scan object with the same specs as get.
197    * @param get get to model scan after
198    */
199   public Scan(Get get) {
200     this.startRow = get.getRow();
201     this.stopRow = get.getRow();
202     this.filter = get.getFilter();
203     this.cacheBlocks = get.getCacheBlocks();
204     this.maxVersions = get.getMaxVersions();
205     this.tr = get.getTimeRange();
206     this.familyMap = get.getFamilyMap();
207   }
208 
209   public boolean isGetScan() {
210     return this.startRow != null && this.startRow.length > 0 &&
211       Bytes.equals(this.startRow, this.stopRow);
212   }
213 
214   /**
215    * Get all columns from the specified family.
216    * <p>
217    * Overrides previous calls to addColumn for this family.
218    * @param family family name
219    * @return this
220    */
221   public Scan addFamily(byte [] family) {
222     familyMap.remove(family);
223     familyMap.put(family, null);
224     return this;
225   }
226 
227   /**
228    * Get the column from the specified family with the specified qualifier.
229    * <p>
230    * Overrides previous calls to addFamily for this family.
231    * @param family family name
232    * @param qualifier column qualifier
233    * @return this
234    */
235   public Scan addColumn(byte [] family, byte [] qualifier) {
236     NavigableSet<byte []> set = familyMap.get(family);
237     if(set == null) {
238       set = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
239     }
240     if (qualifier == null) {
241       qualifier = HConstants.EMPTY_BYTE_ARRAY;
242     }
243     set.add(qualifier);
244     familyMap.put(family, set);
245 
246     return this;
247   }
248 
249   /**
250    * Get versions of columns only within the specified timestamp range,
251    * [minStamp, maxStamp).  Note, default maximum versions to return is 1.  If
252    * your time range spans more than one version and you want all versions
253    * returned, up the number of versions beyond the defaut.
254    * @param minStamp minimum timestamp value, inclusive
255    * @param maxStamp maximum timestamp value, exclusive
256    * @throws IOException if invalid time range
257    * @see #setMaxVersions()
258    * @see #setMaxVersions(int)
259    * @return this
260    */
261   public Scan setTimeRange(long minStamp, long maxStamp)
262   throws IOException {
263     tr = new TimeRange(minStamp, maxStamp);
264     return this;
265   }
266 
267   /**
268    * Get versions of columns with the specified timestamp. Note, default maximum
269    * versions to return is 1.  If your time range spans more than one version
270    * and you want all versions returned, up the number of versions beyond the
271    * defaut.
272    * @param timestamp version timestamp
273    * @see #setMaxVersions()
274    * @see #setMaxVersions(int)
275    * @return this
276    */
277   public Scan setTimeStamp(long timestamp) {
278     try {
279       tr = new TimeRange(timestamp, timestamp+1);
280     } catch(IOException e) {
281       // Will never happen
282     }
283     return this;
284   }
285 
286   /**
287    * Set the start row of the scan.
288    * @param startRow row to start scan on (inclusive)
289    * Note: In order to make startRow exclusive add a trailing 0 byte
290    * @return this
291    */
292   public Scan setStartRow(byte [] startRow) {
293     this.startRow = startRow == null ? HConstants.EMPTY_START_ROW : startRow;
294     return this;
295   }
296 
297   /**
298    * Set the stop row.
299    * @param stopRow row to end at (exclusive)
300    * Note: In order to make stopRow inclusive add a trailing 0 byte
301    * @return this
302    */
303   public Scan setStopRow(byte [] stopRow) {
304     this.stopRow = stopRow == null ? HConstants.EMPTY_END_ROW : stopRow;
305     return this;
306   }
307 
308   /**
309    * Get all available versions.
310    * @return this
311    */
312   public Scan setMaxVersions() {
313     this.maxVersions = Integer.MAX_VALUE;
314     return this;
315   }
316 
317   /**
318    * Get up to the specified number of versions of each column.
319    * @param maxVersions maximum versions for each column
320    * @return this
321    */
322   public Scan setMaxVersions(int maxVersions) {
323     this.maxVersions = maxVersions;
324     return this;
325   }
326 
327   /**
328    * Set the maximum number of values to return for each call to next()
329    * @param batch the maximum number of values
330    */
331   public void setBatch(int batch) {
332     if (this.hasFilter() && this.filter.hasFilterRow()) {
333       throw new IncompatibleFilterException(
334         "Cannot set batch on a scan using a filter" +
335         " that returns true for filter.hasFilterRow");
336     }
337     this.batch = batch;
338   }
339 
340   /**
341    * Set the number of rows for caching that will be passed to scanners.
342    * If not set, the default setting from {@link HTable#getScannerCaching()} will apply.
343    * Higher caching values will enable faster scanners but will use more memory.
344    * @param caching the number of rows for caching
345    */
346   public void setCaching(int caching) {
347     this.caching = caching;
348   }
349 
350   /**
351    * Apply the specified server-side filter when performing the Scan.
352    * @param filter filter to run on the server
353    * @return this
354    */
355   public Scan setFilter(Filter filter) {
356     this.filter = filter;
357     return this;
358   }
359 
360   /**
361    * Setting the familyMap
362    * @param familyMap map of family to qualifier
363    * @return this
364    */
365   public Scan setFamilyMap(Map<byte [], NavigableSet<byte []>> familyMap) {
366     this.familyMap = familyMap;
367     return this;
368   }
369 
370   /**
371    * Getting the familyMap
372    * @return familyMap
373    */
374   public Map<byte [], NavigableSet<byte []>> getFamilyMap() {
375     return this.familyMap;
376   }
377 
378   /**
379    * @return the number of families in familyMap
380    */
381   public int numFamilies() {
382     if(hasFamilies()) {
383       return this.familyMap.size();
384     }
385     return 0;
386   }
387 
388   /**
389    * @return true if familyMap is non empty, false otherwise
390    */
391   public boolean hasFamilies() {
392     return !this.familyMap.isEmpty();
393   }
394 
395   /**
396    * @return the keys of the familyMap
397    */
398   public byte[][] getFamilies() {
399     if(hasFamilies()) {
400       return this.familyMap.keySet().toArray(new byte[0][0]);
401     }
402     return null;
403   }
404 
405   /**
406    * @return the startrow
407    */
408   public byte [] getStartRow() {
409     return this.startRow;
410   }
411 
412   /**
413    * @return the stoprow
414    */
415   public byte [] getStopRow() {
416     return this.stopRow;
417   }
418 
419   /**
420    * @return the max number of versions to fetch
421    */
422   public int getMaxVersions() {
423     return this.maxVersions;
424   }
425 
426   /**
427    * @return maximum number of values to return for a single call to next()
428    */
429   public int getBatch() {
430     return this.batch;
431   }
432 
433   /**
434    * @return caching the number of rows fetched when calling next on a scanner
435    */
436   public int getCaching() {
437     return this.caching;
438   }
439 
440   /**
441    * @return TimeRange
442    */
443   public TimeRange getTimeRange() {
444     return this.tr;
445   }
446 
447   /**
448    * @return RowFilter
449    */
450   public Filter getFilter() {
451     return filter;
452   }
453 
454   /**
455    * @return true is a filter has been specified, false if not
456    */
457   public boolean hasFilter() {
458     return filter != null;
459   }
460 
461   /**
462    * Set whether blocks should be cached for this Scan.
463    * <p>
464    * This is true by default.  When true, default settings of the table and
465    * family are used (this will never override caching blocks if the block
466    * cache is disabled for that family or entirely).
467    *
468    * @param cacheBlocks if false, default settings are overridden and blocks
469    * will not be cached
470    */
471   public void setCacheBlocks(boolean cacheBlocks) {
472     this.cacheBlocks = cacheBlocks;
473   }
474 
475   /**
476    * Get whether blocks should be cached for this Scan.
477    * @return true if default caching should be used, false if blocks should not
478    * be cached
479    */
480   public boolean getCacheBlocks() {
481     return cacheBlocks;
482   }
483 
484   /**
485    * Set the value indicating whether loading CFs on demand should be allowed (cluster
486    * default is false). On-demand CF loading doesn't load column families until necessary, e.g.
487    * if you filter on one column, the other column family data will be loaded only for the rows
488    * that are included in result, not all rows like in normal case.
489    * With column-specific filters, like SingleColumnValueFilter w/filterIfMissing == true,
490    * this can deliver huge perf gains when there's a cf with lots of data; however, it can
491    * also lead to some inconsistent results, as follows:
492    * - if someone does a concurrent update to both column families in question you may get a row
493    *   that never existed, e.g. for { rowKey = 5, { cat_videos => 1 }, { video => "my cat" } }
494    *   someone puts rowKey 5 with { cat_videos => 0 }, { video => "my dog" }, concurrent scan
495    *   filtering on "cat_videos == 1" can get { rowKey = 5, { cat_videos => 1 },
496    *   { video => "my dog" } }.
497    * - if there's a concurrent split and you have more than 2 column families, some rows may be
498    *   missing some column families.
499    */
500   public void setLoadColumnFamiliesOnDemand(boolean value) {
501     setAttribute(ONDEMAND_ATTR, Bytes.toBytes(value));
502   }
503 
504   /**
505    * Get the logical value indicating whether on-demand CF loading should be allowed.
506    */
507   public boolean doLoadColumnFamiliesOnDemand() {
508     byte[] attr = getAttribute(ONDEMAND_ATTR);
509     return attr == null ? false : Bytes.toBoolean(attr);
510   }
511 
512   /**
513    * Set whether this scan is a small scan
514    * <p>
515    * Small scan should use pread and big scan can use seek + read
516    * 
517    * seek + read is fast but can cause two problem (1) resource contention (2)
518    * cause too much network io
519    * 
520    * [89-fb] Using pread for non-compaction read request
521    * https://issues.apache.org/jira/browse/HBASE-7266
522    * 
523    * On the other hand, if setting it true, we would do
524    * openScanner,next,closeScanner in one RPC call. It means the better
525    * performance for small scan. [HBASE-9488].
526    * 
527    * Generally, if the scan range is within one data block(64KB), it could be
528    * considered as a small scan.
529    * 
530    * @param small
531    */
532   public void setSmall(boolean small) {
533     setAttribute(SMALL_ATTR, Bytes.toBytes(small));
534   }
535 
536   /**
537    * Get whether this scan is a small scan
538    * @return true if small scan
539    */
540   public boolean isSmall() {
541     byte[] attr = getAttribute(SMALL_ATTR);
542     return attr == null ? false : Bytes.toBoolean(attr);
543   }
544 
545   /**
546    * Compile the table and column family (i.e. schema) information
547    * into a String. Useful for parsing and aggregation by debugging,
548    * logging, and administration tools.
549    * @return Map
550    */
551   @Override
552   public Map<String, Object> getFingerprint() {
553     Map<String, Object> map = new HashMap<String, Object>();
554     List<String> families = new ArrayList<String>();
555     if(this.familyMap.size() == 0) {
556       map.put("families", "ALL");
557       return map;
558     } else {
559       map.put("families", families);
560     }
561     for (Map.Entry<byte [], NavigableSet<byte[]>> entry :
562         this.familyMap.entrySet()) {
563       families.add(Bytes.toStringBinary(entry.getKey()));
564     }
565     return map;
566   }
567 
568   /**
569    * Compile the details beyond the scope of getFingerprint (row, columns,
570    * timestamps, etc.) into a Map along with the fingerprinted information.
571    * Useful for debugging, logging, and administration tools.
572    * @param maxCols a limit on the number of columns output prior to truncation
573    * @return Map
574    */
575   @Override
576   public Map<String, Object> toMap(int maxCols) {
577     // start with the fingerpring map and build on top of it
578     Map<String, Object> map = getFingerprint();
579     // map from families to column list replaces fingerprint's list of families
580     Map<String, List<String>> familyColumns =
581       new HashMap<String, List<String>>();
582     map.put("families", familyColumns);
583     // add scalar information first
584     map.put("startRow", Bytes.toStringBinary(this.startRow));
585     map.put("stopRow", Bytes.toStringBinary(this.stopRow));
586     map.put("maxVersions", this.maxVersions);
587     map.put("batch", this.batch);
588     map.put("caching", this.caching);
589     map.put("cacheBlocks", this.cacheBlocks);
590     List<Long> timeRange = new ArrayList<Long>();
591     timeRange.add(this.tr.getMin());
592     timeRange.add(this.tr.getMax());
593     map.put("timeRange", timeRange);
594     int colCount = 0;
595     // iterate through affected families and list out up to maxCols columns
596     for (Map.Entry<byte [], NavigableSet<byte[]>> entry :
597       this.familyMap.entrySet()) {
598       List<String> columns = new ArrayList<String>();
599       familyColumns.put(Bytes.toStringBinary(entry.getKey()), columns);
600       if(entry.getValue() == null) {
601         colCount++;
602         --maxCols;
603         columns.add("ALL");
604       } else {
605         colCount += entry.getValue().size();
606         if (maxCols <= 0) {
607           continue;
608         } 
609         for (byte [] column : entry.getValue()) {
610           if (--maxCols <= 0) {
611             continue;
612           }
613           columns.add(Bytes.toStringBinary(column));
614         }
615       } 
616     }       
617     map.put("totalColumns", colCount);
618     if (this.filter != null) {
619       map.put("filter", this.filter.toString());
620     }
621     // add the id if set
622     if (getId() != null) {
623       map.put("id", getId());
624     }
625     return map;
626   }
627 
628   //Writable
629   public void readFields(final DataInput in)
630   throws IOException {
631     int version = in.readByte();
632     if (version > (int)SCAN_VERSION) {
633       throw new IOException("version not supported");
634     }
635     this.startRow = Bytes.readByteArray(in);
636     this.stopRow = Bytes.readByteArray(in);
637     this.maxVersions = in.readInt();
638     this.batch = in.readInt();
639     this.caching = in.readInt();
640     this.cacheBlocks = in.readBoolean();
641     if(in.readBoolean()) {
642       this.filter = Classes.createWritableForName(
643         Bytes.toString(Bytes.readByteArray(in)));
644       this.filter.readFields(in);
645     }
646     this.tr = new TimeRange();
647     tr.readFields(in);
648     int numFamilies = in.readInt();
649     this.familyMap =
650       new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
651     for(int i=0; i<numFamilies; i++) {
652       byte [] family = Bytes.readByteArray(in);
653       int numColumns = in.readInt();
654       TreeSet<byte []> set = new TreeSet<byte []>(Bytes.BYTES_COMPARATOR);
655       for(int j=0; j<numColumns; j++) {
656         byte [] qualifier = Bytes.readByteArray(in);
657         set.add(qualifier);
658       }
659       this.familyMap.put(family, set);
660     }
661 
662     if (version > 1) {
663       readAttributes(in);
664     }
665   }
666 
667   public void write(final DataOutput out)
668   throws IOException {
669     out.writeByte(SCAN_VERSION);
670     Bytes.writeByteArray(out, this.startRow);
671     Bytes.writeByteArray(out, this.stopRow);
672     out.writeInt(this.maxVersions);
673     out.writeInt(this.batch);
674     out.writeInt(this.caching);
675     out.writeBoolean(this.cacheBlocks);
676     if(this.filter == null) {
677       out.writeBoolean(false);
678     } else {
679       out.writeBoolean(true);
680       Bytes.writeByteArray(out, Bytes.toBytes(filter.getClass().getName()));
681       filter.write(out);
682     }
683     tr.write(out);
684     out.writeInt(familyMap.size());
685     for(Map.Entry<byte [], NavigableSet<byte []>> entry : familyMap.entrySet()) {
686       Bytes.writeByteArray(out, entry.getKey());
687       NavigableSet<byte []> columnSet = entry.getValue();
688       if(columnSet != null){
689         out.writeInt(columnSet.size());
690         for(byte [] qualifier : columnSet) {
691           Bytes.writeByteArray(out, qualifier);
692         }
693       } else {
694         out.writeInt(0);
695       }
696     }
697     writeAttributes(out);
698   }
699 
700   /**
701    * Enable/disable "raw" mode for this scan.
702    * If "raw" is enabled the scan will return all
703    * delete marker and deleted rows that have not
704    * been collected, yet.
705    * This is mostly useful for Scan on column families
706    * that have KEEP_DELETED_ROWS enabled.
707    * It is an error to specify any column when "raw" is set.
708    * @param raw True/False to enable/disable "raw" mode.
709    */
710   public void setRaw(boolean raw) {
711     setAttribute(RAW_ATTR, Bytes.toBytes(raw));
712   }
713 
714   /**
715    * @return True if this Scan is in "raw" mode.
716    */
717   public boolean isRaw() {
718     byte[] attr = getAttribute(RAW_ATTR);
719     return attr == null ? false : Bytes.toBoolean(attr);
720   }
721 
722   /*
723    * Set the isolation level for this scan. If the
724    * isolation level is set to READ_UNCOMMITTED, then
725    * this scan will return data from committed and
726    * uncommitted transactions. If the isolation level 
727    * is set to READ_COMMITTED, then this scan will return 
728    * data from committed transactions only. If a isolation
729    * level is not explicitly set on a Scan, then it 
730    * is assumed to be READ_COMMITTED.
731    * @param level IsolationLevel for this scan
732    */
733   public void setIsolationLevel(IsolationLevel level) {
734     setAttribute(ISOLATION_LEVEL, level.toBytes());
735   }
736   /*
737    * @return The isolation level of this scan.
738    * If no isolation level was set for this scan object, 
739    * then it returns READ_COMMITTED.
740    * @return The IsolationLevel for this scan
741    */
742   public IsolationLevel getIsolationLevel() {
743     byte[] attr = getAttribute(ISOLATION_LEVEL);
744     return attr == null ? IsolationLevel.READ_COMMITTED :
745                           IsolationLevel.fromBytes(attr);
746   }
747 }