public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠(chǎng),JMS 用它創(chuàng )建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // JMS 客戶(hù)端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個(gè)發(fā)送或接收消息的線(xiàn)程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰(shuí). // 獲取session注意參數值Queue.Name是Query的名字 Destination destination = session.createQueue("[color=red]Queue.Name[/color]"); // MessageProducer:消息生產(chǎn)者 MessageProducer producer = session.createProducer(destination); // 設置不持久化 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); // 發(fā)送一條消息 sendMsg(session, producer); session.commit(); connection.close(); } /** * 在指定的會(huì )話(huà)上,通過(guò)指定的消息生產(chǎn)者發(fā)出一條消息 * * @param session 消息會(huì )話(huà) * @param producer 消息生產(chǎn)者 */ public static void sendMsg(Session session, MessageProducer producer) throws JMSException { // 創(chuàng )建一條文本消息 TextMessage message = session.createTextMessage("Hello ActiveMQ!"); // 通過(guò)消息生產(chǎn)者發(fā)出消息 producer.send(message); System.out.println(""); }public static void main(String[] args) throws JMSException { // ConnectionFactory :連接工廠(chǎng),JMS 用它創(chuàng )建連接 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(ActiveMQConnection.DEFAULT_USER, ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616"); // JMS 客戶(hù)端到JMS Provider 的連接 Connection connection = connectionFactory.createConnection(); connection.start(); // Session: 一個(gè)發(fā)送或接收消息的線(xiàn)程 Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE); // Destination :消息的目的地;消息發(fā)送給誰(shuí). // 獲取session注意參數值xingbo.xu-queue是一個(gè)服務(wù)器的queue,須在在A(yíng)ctiveMq的console配置 Destination destination = session.createQueue("Queue.Name"); // 消費者,消息接收者 MessageConsumer consumer = session.createConsumer(destination); while(true) { TextMessage message = (TextMessage) consumer.receive(1000); if(null != message) System.out.println("收到消息:" + message.getText()); else break; } session.close(); connection.close(); }<!-- 配置JMS消息發(fā)送 --> <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL"> <value>tcp://localhost:61616</value> </property> </bean> </property> </bean> <!-- Spring JMS Template --> <bean id="myJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <property name="connectionFactory"> <ref local="jmsFactory" /> </property> </bean> <bean id="destination" class="org.apache.activemq.command.ActiveMQQueue"> <constructor-arg index="0"> <value>Queue.Name</value> </constructor-arg> </bean> <bean id="sender" class="demo.JmsQueueSender"> <property name="jmsTemplate" ref="myJmsTemplate"></property> </bean> <bean id="receive" class="demo.JmsQueueReceiver"></bean> <bean id="listenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="jmsFactory"></property> <property name="messageListener" ref="receive"></property> <property name="destination" ref="destination" /> </bean> <!-- 配置JMS消息發(fā)送完成 -->
@Componentpublic class JmsQueueSender{ private JmsTemplate jmsTemplate; public void setConnectionFactory(ConnectionFactory cf) { this.jmsTemplate = new JmsTemplate(cf); } public void simpleSend() { jmsTemplate.convertAndSend("Queue.Name", "test!!!"); } public JmsTemplate getJmsTemplate() { return jmsTemplate; } public void setJmsTemplate(JmsTemplate jmsTemplate) { this.jmsTemplate = jmsTemplate; }}@Componentpublic class JmsQueueReceiver implements MessageListener{ @Override public void onMessage(Message message) { if(message instanceof TextMessage) { final TextMessage textMessage = (TextMessage) message; try { System.out.println(textMessage.getText()); } catch(final JMSException e) { e.printStackTrace(); } } }} <persistenceAdapter> <kahaDB directory="${activemq.base}/data/kahadb"/> </persistenceAdapter><persistenceAdapter> <jdbcPersistenceAdapter dataSource="#mysql-ds"/> </persistenceAdapter>
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close"> <property name="driverClassName" value="com.mysql.jdbc.Driver"/> <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> <property name="username" value="root"/> <property name="password" value="root"/> <property name="poolPreparedStatements" value="true"/> </bean>
聯(lián)系客服