|
|||||||||||||||||||
Source file | Conditionals | Statements | Methods | TOTAL | |||||||||||||||
JavaGroupsBroadcastingListener.java | 0% | 0% | 0% | 0% |
|
1 |
/*
|
|
2 |
* Copyright (c) 2002-2003 by OpenSymphony
|
|
3 |
* All rights reserved.
|
|
4 |
*/
|
|
5 |
package com.opensymphony.oscache.plugins.clustersupport;
|
|
6 |
|
|
7 |
import com.opensymphony.oscache.base.Cache;
|
|
8 |
import com.opensymphony.oscache.base.Config;
|
|
9 |
import com.opensymphony.oscache.base.FinalizationException;
|
|
10 |
import com.opensymphony.oscache.base.InitializationException;
|
|
11 |
|
|
12 |
import org.apache.commons.logging.Log;
|
|
13 |
import org.apache.commons.logging.LogFactory;
|
|
14 |
|
|
15 |
import org.javagroups.Address;
|
|
16 |
import org.javagroups.Channel;
|
|
17 |
|
|
18 |
import org.javagroups.blocks.NotificationBus;
|
|
19 |
|
|
20 |
import java.io.Serializable;
|
|
21 |
|
|
22 |
/**
|
|
23 |
* <p>A concrete implementation of the {@link AbstractBroadcastingListener} based on
|
|
24 |
* the JavaGroups library. This Class uses JavaGroups to broadcast cache flush
|
|
25 |
* messages across a cluster.</p>
|
|
26 |
*
|
|
27 |
* <p>One of the following properties should be configured in <code>oscache.properties</code> for
|
|
28 |
* this listener:
|
|
29 |
* <ul>
|
|
30 |
* <li><b>cache.cluster.multicast.ip</b> - The multicast IP that JavaGroups should use for broadcasting</li>
|
|
31 |
* <li><b>cache.cluster.properties</b> - The JavaGroups channel properties to use. Allows for precise
|
|
32 |
* control over the behaviour of JavaGroups</li>
|
|
33 |
* </ul>
|
|
34 |
* Please refer to the clustering documentation for further details on the configuration of this listener.</p>
|
|
35 |
*
|
|
36 |
* @author <a href="mailto:chris@swebtec.com">Chris Miller</a>
|
|
37 |
*/
|
|
38 |
public class JavaGroupsBroadcastingListener extends AbstractBroadcastingListener implements NotificationBus.Consumer { |
|
39 |
private final static Log log = LogFactory.getLog(JavaGroupsBroadcastingListener.class); |
|
40 |
private static final String BUS_NAME = "OSCacheBus"; |
|
41 |
private static final String CHANNEL_PROPERTIES = "cache.cluster.properties"; |
|
42 |
private static final String MULTICAST_IP_PROPERTY = "cache.cluster.multicast.ip"; |
|
43 |
|
|
44 |
/**
|
|
45 |
* The first half of the default channel properties. They default channel properties are:
|
|
46 |
* <pre>
|
|
47 |
* UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(desired_avg_gossip=20000):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
|
|
48 |
* </pre>
|
|
49 |
*
|
|
50 |
*
|
|
51 |
* Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
|
|
52 |
*/
|
|
53 |
private static final String DEFAULT_CHANNEL_PROPERTIES_PRE = "UDP(mcast_addr="; |
|
54 |
|
|
55 |
/**
|
|
56 |
* The second half of the default channel properties. They default channel properties are:
|
|
57 |
* <pre>
|
|
58 |
* UDP(mcast_addr=*.*.*.*;mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(desired_avg_gossip=20000):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)
|
|
59 |
* </pre>
|
|
60 |
* Where <code>*.*.*.*</code> is the specified multicast IP, which defaults to <code>231.12.21.132</code>.
|
|
61 |
*/
|
|
62 |
private static final String DEFAULT_CHANNEL_PROPERTIES_POST = ";mcast_port=45566;ip_ttl=32;mcast_send_buf_size=150000;mcast_recv_buf_size=80000):PING(timeout=2000;num_initial_members=3):MERGE2(min_interval=5000;max_interval=10000):FD_SOCK:VERIFY_SUSPECT(timeout=1500):pbcast.NAKACK(gc_lag=50;retransmit_timeout=300,600,1200,2400,4800):pbcast.STABLE(desired_avg_gossip=20000):UNICAST(timeout=5000):FRAG(frag_size=8096;down_thread=false;up_thread=false):pbcast.GMS(join_timeout=5000;join_retry_timeout=2000;shun=false;print_local_addr=true)"; |
|
63 |
private static final String DEFAULT_MULTICAST_IP = "231.12.21.132"; |
|
64 |
private NotificationBus bus;
|
|
65 |
|
|
66 |
/**
|
|
67 |
* Initializes the broadcasting listener by starting up a JavaGroups notification
|
|
68 |
* bus instance to handle incoming and outgoing messages.
|
|
69 |
*
|
|
70 |
* @param config An OSCache configuration object.
|
|
71 |
* @throws com.opensymphony.oscache.base.InitializationException If this listener has
|
|
72 |
* already been initialized.
|
|
73 |
*/
|
|
74 | 0 |
public synchronized void initialize(Cache cache, Config config) throws InitializationException { |
75 | 0 |
super.initialize(cache, config);
|
76 |
|
|
77 | 0 |
String properties = config.getProperty(CHANNEL_PROPERTIES); |
78 | 0 |
String multicastIP = config.getProperty(MULTICAST_IP_PROPERTY); |
79 |
|
|
80 | 0 |
if ((properties == null) && (multicastIP == null)) { |
81 | 0 |
multicastIP = DEFAULT_MULTICAST_IP; |
82 |
} |
|
83 |
|
|
84 | 0 |
if (properties == null) { |
85 | 0 |
properties = DEFAULT_CHANNEL_PROPERTIES_PRE + multicastIP.trim() + DEFAULT_CHANNEL_PROPERTIES_POST; |
86 |
} else {
|
|
87 | 0 |
properties = properties.trim(); |
88 |
} |
|
89 |
|
|
90 | 0 |
if (log.isInfoEnabled()) {
|
91 | 0 |
log.info("Starting a new JavaGroups broadcasting listener with properties=" + properties);
|
92 |
} |
|
93 |
|
|
94 | 0 |
try {
|
95 | 0 |
bus = new NotificationBus(BUS_NAME, properties);
|
96 | 0 |
bus.start(); |
97 | 0 |
bus.getChannel().setOpt(Channel.LOCAL, new Boolean(false)); |
98 | 0 |
bus.setConsumer(this);
|
99 | 0 |
log.info("JavaGroups clustering support started successfully");
|
100 |
} catch (Exception e) {
|
|
101 | 0 |
throw new InitializationException("Initialization failed: " + e); |
102 |
} |
|
103 |
} |
|
104 |
|
|
105 |
/**
|
|
106 |
* Shuts down the JavaGroups being managed by this listener. This
|
|
107 |
* occurs once the cache is shut down and this listener is no longer
|
|
108 |
* in use.
|
|
109 |
*
|
|
110 |
* @throws com.opensymphony.oscache.base.FinalizationException
|
|
111 |
*/
|
|
112 | 0 |
public synchronized void finialize() throws FinalizationException { |
113 | 0 |
if (log.isInfoEnabled()) {
|
114 | 0 |
log.info("JavaGroups shutting down...");
|
115 |
} |
|
116 |
|
|
117 | 0 |
bus.stop(); |
118 | 0 |
bus = null;
|
119 |
|
|
120 | 0 |
if (log.isInfoEnabled()) {
|
121 | 0 |
log.info("JavaGroups shutdown complete.");
|
122 |
} |
|
123 |
} |
|
124 |
|
|
125 |
/**
|
|
126 |
* Uses JavaGroups to broadcast the supplied notification message across the cluster.
|
|
127 |
*
|
|
128 |
* @param message The cluster nofication message to broadcast.
|
|
129 |
*/
|
|
130 | 0 |
protected void sendNotification(ClusterNotification message) { |
131 | 0 |
bus.sendNotification(message); |
132 |
} |
|
133 |
|
|
134 |
/**
|
|
135 |
* Handles incoming notification messages from JavaGroups. This method should
|
|
136 |
* never be called directly.
|
|
137 |
*
|
|
138 |
* @param serializable The incoming message object. This must be a {@link ClusterNotification}.
|
|
139 |
*/
|
|
140 | 0 |
public void handleNotification(Serializable serializable) { |
141 | 0 |
if (!(serializable instanceof ClusterNotification)) { |
142 | 0 |
log.error("An unknown cluster notification message received (class=" + serializable.getClass().getName() + "). Notification ignored."); |
143 |
|
|
144 | 0 |
return;
|
145 |
} |
|
146 |
|
|
147 | 0 |
handleClusterNotification((ClusterNotification) serializable); |
148 |
} |
|
149 |
|
|
150 |
/**
|
|
151 |
* We are not using the caching, so we just return something that identifies
|
|
152 |
* us. This method should never be called directly.
|
|
153 |
*/
|
|
154 | 0 |
public Serializable getCache() {
|
155 | 0 |
return "JavaGroupsBroadcastingListener: " + bus.getLocalAddress(); |
156 |
} |
|
157 |
|
|
158 |
/**
|
|
159 |
* A callback that is fired when a new member joins the cluster. This
|
|
160 |
* method should never be called directly.
|
|
161 |
*
|
|
162 |
* @param address The address of the member who just joined.
|
|
163 |
*/
|
|
164 | 0 |
public void memberJoined(Address address) { |
165 | 0 |
if (log.isInfoEnabled()) {
|
166 | 0 |
log.info("A new member at address '" + address + "' has joined the cluster"); |
167 |
} |
|
168 |
} |
|
169 |
|
|
170 |
/**
|
|
171 |
* A callback that is fired when an existing member leaves the cluster.
|
|
172 |
* This method should never be called directly.
|
|
173 |
*
|
|
174 |
* @param address The address of the member who left.
|
|
175 |
*/
|
|
176 | 0 |
public void memberLeft(Address address) { |
177 | 0 |
if (log.isInfoEnabled()) {
|
178 | 0 |
log.info("Member at address '" + address + "' left the cluster"); |
179 |
} |
|
180 |
} |
|
181 |
} |
|
182 |
|
|