001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.util;
018
019import java.io.IOException;
020import java.sql.SQLException;
021import java.util.concurrent.TimeUnit;
022import java.util.concurrent.atomic.AtomicBoolean;
023
024import org.apache.activemq.broker.BrokerService;
025import org.slf4j.Logger;
026import org.slf4j.LoggerFactory;
027
028/**
029 * @org.apache.xbean.XBean
030 */
031 public class DefaultIOExceptionHandler implements IOExceptionHandler {
032
033    private static final Logger LOG = LoggerFactory
034            .getLogger(DefaultIOExceptionHandler.class);
035    private BrokerService broker;
036    private boolean ignoreAllErrors = false;
037    private boolean ignoreNoSpaceErrors = true;
038    private boolean ignoreSQLExceptions = true;
039    private boolean stopStartConnectors = false;
040    private String noSpaceMessage = "space";
041    private String sqlExceptionMessage = ""; // match all
042    private long resumeCheckSleepPeriod = 5*1000;
043    private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044
045    public void handle(IOException exception) {
046        if (ignoreAllErrors) {
047            LOG.info("Ignoring IO exception, " + exception, exception);
048            return;
049        }
050
051        if (ignoreNoSpaceErrors) {
052            Throwable cause = exception;
053            while (cause != null && cause instanceof IOException) {
054                String message = cause.getMessage();
055                if (message != null && message.contains(noSpaceMessage)) {
056                    LOG.info("Ignoring no space left exception, " + exception, exception);
057                    return;
058                }
059                cause = cause.getCause();
060            }
061        }
062
063        if (ignoreSQLExceptions) {
064            Throwable cause = exception;
065            while (cause != null) {
066                String message = cause.getMessage();
067                if (cause instanceof SQLException && message.contains(sqlExceptionMessage)) {
068                    LOG.info("Ignoring SQLException, " + exception, cause);
069                    return;
070                }
071                cause = cause.getCause();
072            }
073        }
074
075        if (stopStartConnectors) {
076            if (!stopStartInProgress.compareAndSet(false, true)) {
077                // we are already working on it
078                return;
079            }
080            LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
081
082            new Thread("stop transport connectors on IO exception") {
083                public void run() {
084                    try {
085                        ServiceStopper stopper = new ServiceStopper();
086                        broker.stopAllConnectors(stopper);
087                    } catch (Exception e) {
088                        LOG.warn("Failure occurred while stopping broker connectors", e);
089                    }
090                }
091            }.start();
092
093            // resume again
094            new Thread("restart transport connectors post IO exception") {
095                public void run() {
096                    try {
097                        while (isPersistenceAdapterDown()) {
098                            LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
099                            TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
100                        }
101                        broker.startAllConnectors();
102                    } catch (Exception e) {
103                        LOG.warn("Failure occurred while restarting broker connectors", e);
104                    } finally {
105                        stopStartInProgress.compareAndSet(true, false);
106                    }
107                }
108
109                private boolean isPersistenceAdapterDown() {
110                    boolean checkpointSuccess = false;
111                    try {
112                        broker.getPersistenceAdapter().checkpoint(true);
113                        checkpointSuccess = true;
114                    } catch (Throwable ignored) {}
115                    return !checkpointSuccess;
116                }
117            }.start();
118
119            return;
120        }
121
122        LOG.info("Stopping the broker due to IO exception, " + exception, exception);
123        new Thread("Stopping the broker due to IO exception") {
124            public void run() {
125                try {
126                    broker.stop();
127                } catch (Exception e) {
128                    LOG.warn("Failure occurred while stopping broker", e);
129                }
130            }
131        }.start();
132    }
133
134    public void setBrokerService(BrokerService broker) {
135        this.broker = broker;
136    }
137
138    public boolean isIgnoreAllErrors() {
139        return ignoreAllErrors;
140    }
141
142    public void setIgnoreAllErrors(boolean ignoreAllErrors) {
143        this.ignoreAllErrors = ignoreAllErrors;
144    }
145
146    public boolean isIgnoreNoSpaceErrors() {
147        return ignoreNoSpaceErrors;
148    }
149
150    public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
151        this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
152    }
153
154    public String getNoSpaceMessage() {
155        return noSpaceMessage;
156    }
157
158    public void setNoSpaceMessage(String noSpaceMessage) {
159        this.noSpaceMessage = noSpaceMessage;
160    }
161
162    public boolean isIgnoreSQLExceptions() {
163        return ignoreSQLExceptions;
164    }
165
166    public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
167        this.ignoreSQLExceptions = ignoreSQLExceptions;
168    }
169
170    public String getSqlExceptionMessage() {
171        return sqlExceptionMessage;
172    }
173
174    public void setSqlExceptionMessage(String sqlExceptionMessage) {
175        this.sqlExceptionMessage = sqlExceptionMessage;
176    }
177
178    public boolean isStopStartConnectors() {
179        return stopStartConnectors;
180    }
181
182    public void setStopStartConnectors(boolean stopStartConnectors) {
183        this.stopStartConnectors = stopStartConnectors;
184    }
185
186    public long getResumeCheckSleepPeriod() {
187        return resumeCheckSleepPeriod;
188    }
189
190    public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
191        this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
192    }
193}