001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, software 013 * distributed under the License is distributed on an "AS IS" BASIS, 014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 015 * See the License for the specific language governing permissions and 016 * limitations under the License. 017 */ 018package org.apache.hadoop.hbase.wal; 019 020import java.io.IOException; 021import java.util.List; 022import java.util.Map; 023import org.apache.hadoop.fs.Path; 024import org.apache.hadoop.hbase.HBaseClassTestRule; 025import org.apache.hadoop.hbase.testclassification.RegionServerTests; 026import org.apache.hadoop.hbase.testclassification.SmallTests; 027import org.junit.Assert; 028import org.junit.ClassRule; 029import org.junit.Test; 030import org.junit.experimental.categories.Category; 031 032@Category({ RegionServerTests.class, SmallTests.class }) 033public class TestOutputSinkWriter { 034 035 @ClassRule 036 public static final HBaseClassTestRule CLASS_RULE = 037 HBaseClassTestRule.forClass(TestOutputSinkWriter.class); 038 039 @Test 040 public void testExeptionHandling() throws IOException, InterruptedException { 041 WALSplitter.PipelineController controller = new WALSplitter.PipelineController(); 042 BrokenEntryBuffers entryBuffers = new BrokenEntryBuffers(controller, 2000); 043 OutputSink sink = new OutputSink(controller, entryBuffers, 1) { 044 045 @Override 046 protected int getNumOpenWriters() { 047 return 0; 048 } 049 050 @Override 051 protected void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { 052 053 } 054 055 @Override 056 protected List<Path> close() throws IOException { 057 return null; 058 } 059 060 @Override 061 public Map<String, Long> getOutputCounts() { 062 return null; 063 } 064 065 @Override 066 public int getNumberOfRecoveredRegions() { 067 return 0; 068 } 069 070 @Override 071 public boolean keepRegionEvent(WAL.Entry entry) { 072 return false; 073 } 074 }; 075 076 // start the Writer thread and give it time trow the exception 077 sink.startWriterThreads(); 078 Thread.sleep(1000L); 079 080 // make sure the exception is stored 081 try { 082 controller.checkForErrors(); 083 Assert.fail(); 084 } catch (RuntimeException re) { 085 Assert.assertTrue(true); 086 } 087 088 sink.restartWriterThreadsIfNeeded(); 089 090 // after the check the stored exception should be gone 091 try { 092 controller.checkForErrors(); 093 } catch (RuntimeException re) { 094 Assert.fail(); 095 } 096 097 // prep another exception and wait for it to be thrown 098 entryBuffers.setThrowError(true); 099 Thread.sleep(1000L); 100 101 // make sure the exception is stored 102 try { 103 controller.checkForErrors(); 104 Assert.fail(); 105 } catch (RuntimeException re) { 106 Assert.assertTrue(true); 107 } 108 } 109 110 static class BrokenEntryBuffers extends EntryBuffers { 111 boolean throwError = true; 112 113 public BrokenEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) { 114 super(controller, maxHeapUsage); 115 } 116 117 @Override 118 synchronized EntryBuffers.RegionEntryBuffer getChunkToWrite() { 119 // This just emulates something going wrong with in the Writer 120 if (throwError) { 121 throwError = false; 122 throw new RuntimeException("testing"); 123 } 124 return null; 125 } 126 127 public void setThrowError(boolean newValue) { 128 throwError = newValue; 129 } 130 }; 131}