001/**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements.  See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License.  You may obtain a copy of the License at
008 *
009 *      http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017package org.apache.activemq.transport.peer;
018
019import java.io.IOException;
020import java.net.URI;
021import java.net.URISyntaxException;
022import java.util.HashMap;
023import java.util.Map;
024import java.util.concurrent.ConcurrentHashMap;
025import java.util.concurrent.ConcurrentMap;
026
027import org.apache.activemq.broker.BrokerFactoryHandler;
028import org.apache.activemq.broker.BrokerService;
029import org.apache.activemq.broker.TransportConnector;
030import org.apache.activemq.transport.Transport;
031import org.apache.activemq.transport.TransportFactory;
032import org.apache.activemq.transport.TransportServer;
033import org.apache.activemq.transport.vm.VMTransportFactory;
034import org.apache.activemq.util.IOExceptionSupport;
035import org.apache.activemq.util.IdGenerator;
036import org.apache.activemq.util.IntrospectionSupport;
037import org.apache.activemq.util.URISupport;
038
039public class PeerTransportFactory extends TransportFactory {
040
041    public static final ConcurrentMap BROKERS = new ConcurrentHashMap();
042    public static final ConcurrentMap CONNECTORS = new ConcurrentHashMap();
043    public static final ConcurrentMap SERVERS = new ConcurrentHashMap();
044    private static final IdGenerator ID_GENERATOR = new IdGenerator("peer-");
045
046    @Override
047    public Transport doConnect(URI location) throws Exception {
048        VMTransportFactory vmTransportFactory = createTransportFactory(location);
049        return vmTransportFactory.doConnect(location);
050    }
051
052    @Override
053    public Transport doCompositeConnect(URI location) throws Exception {
054        VMTransportFactory vmTransportFactory = createTransportFactory(location);
055        return vmTransportFactory.doCompositeConnect(location);
056    }
057
058    /**
059     * @param location
060     * @return the converted URI
061     * @throws URISyntaxException
062     */
063    private VMTransportFactory createTransportFactory(URI location) throws IOException {
064        try {
065            String group = location.getHost();
066            String broker = URISupport.stripPrefix(location.getPath(), "/");
067
068            if (group == null) {
069                group = "default";
070            }
071            if (broker == null || broker.length() == 0) {
072                broker = ID_GENERATOR.generateSanitizedId();
073            }
074
075            final Map<String, String> brokerOptions = new HashMap<String, String>(URISupport.parseParameters(location));
076            if (!brokerOptions.containsKey("persistent")) {
077                brokerOptions.put("persistent", "false");
078            }
079
080            final URI finalLocation = new URI("vm://" + broker);
081            final String finalBroker = broker;
082            final String finalGroup = group;
083            VMTransportFactory rc = new VMTransportFactory() {
084                @Override
085                public Transport doConnect(URI ignore) throws Exception {
086                    return super.doConnect(finalLocation);
087                };
088
089                @Override
090                public Transport doCompositeConnect(URI ignore) throws Exception {
091                    return super.doCompositeConnect(finalLocation);
092                };
093            };
094            rc.setBrokerFactoryHandler(new BrokerFactoryHandler() {
095                @Override
096                public BrokerService createBroker(URI brokerURI) throws Exception {
097                    BrokerService service = new BrokerService();
098                    IntrospectionSupport.setProperties(service, brokerOptions);
099                    service.setBrokerName(finalBroker);
100                    TransportConnector c = service.addConnector("tcp://0.0.0.0:0");
101                    c.setDiscoveryUri(new URI("multicast://default?group=" + finalGroup));
102                    service.addNetworkConnector("multicast://default?group=" + finalGroup);
103                    return service;
104                }
105            });
106            return rc;
107
108        } catch (URISyntaxException e) {
109            throw IOExceptionSupport.create(e);
110        }
111    }
112
113    @Override
114    public TransportServer doBind(URI location) throws IOException {
115        throw new IOException("This protocol does not support being bound.");
116    }
117
118}