欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久

打開(kāi)APP
userphoto
未登錄

開(kāi)通VIP,暢享免費電子書(shū)等14項超值服

開(kāi)通VIP
ActiveMQ學(xué)習系列(四)----消息持久化到mysql

前記:目前學(xué)習還比較雜亂,還未找到系統化地學(xué)習ActiveMq的方法。在網(wǎng)上看到消息持久化的demo,了解了一下,在此記錄。

 

一、目前ActiveMq支持的持久化方法

url:http://activemq.apache.org/persistence.html

1、Replicated LevelDB Store 

2、LevelDB Store

3、KahaDB

4、JDBC 配合其自帶的 high performance journal;根據官方說(shuō)法,它內置的高性能journal的工作類(lèi)似于在緩存層工作,消息會(huì )優(yōu)先寫(xiě)入到j(luò )ournal,后臺的定時(shí)任務(wù)會(huì )每隔一段時(shí)間間隔去

查看需要寫(xiě)入到j(luò )dbc的消息。

二、配置activemq.xml

1、修改persistenceAdapter

        <persistenceAdapter>            <!-- <kahaDB directory="${activemq.data}/kahadb"/>-->            <jdbcPersistenceAdapter dataSource="#my-ds"/>        </persistenceAdapter>

上面我們注釋了默認的kahadb,添加了jdbc數據源。

2、增加數據源

<bean id="my-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">                <property name="driverClassName" value="com.mysql.jdbc.Driver" />                <property name="url" value="jdbc:mysql://192.168.2.140:3306/activemq?characterEncoding=utf-8" />                <property name="username" value="root" />                <property name="password" value="123456" />                <property name="initialSize" value="5" />                <property name="maxTotal" value="100" />                <property name="maxIdle" value="30" />                <property name="maxWaitMillis" value="10000" />                <property name="minIdle" value="1" /></bean>

這邊有幾個(gè)注意點(diǎn):

2.1:上面配置的數據源類(lèi)對應于dbcp2,如果1.x系列的jar包會(huì )報錯,classNotFound異常。

2.2:在主安裝目錄的lib目錄下,將mysql的jar包、dbcp的jar包放到該路徑下

/usr/local/apache-activemq-5.14.4/lib

2.3:建議用bin/activemq console方式啟動(dòng),可以及時(shí)查看錯誤信息。我在啟動(dòng)時(shí),報以下錯誤:

 WARN | Failure Details: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't existcom.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'xckk_star_act.ACTIVEMQ_ACKS' doesn't exist        at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)[:1.8.0_121]        at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)[:1.8.0_121]        at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)[:1.8.0_121]        at java.lang.reflect.Constructor.newInstance(Constructor.java:423)[:1.8.0_121]        at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.Util.getInstance(Util.java:387)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:939)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3878)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3814)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2478)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2625)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2551)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1861)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2073)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2009)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeLargeUpdate(PreparedStatement.java:5094)[mysql-connector-java-5.1.38.jar:5.1.38]        at com.mysql.jdbc.PreparedStatement.executeUpdate(PreparedStatement.java:1994)[mysql-connector-java-5.1.38.jar:5.1.38]        at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]        at org.apache.commons.dbcp2.DelegatingPreparedStatement.executeUpdate(DelegatingPreparedStatement.java:98)[commons-dbcp2-2.1.1.jar:2.1.1]        at org.apache.activemq.store.jdbc.adapter.DefaultJDBCAdapter.doDeleteOldMessages(DefaultJDBCAdapter.java:832)[activemq-jdbc-store-5.14.4.jar:5.14.4]        at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter.cleanup(JDBCPersistenceAdapter.java:349)[activemq-jdbc-store-5.14.4.jar:5.14.4]        at org.apache.activemq.store.jdbc.JDBCPersistenceAdapter$3.run(JDBCPersistenceAdapter.java:327)[activemq-jdbc-store-5.14.4.jar:5.14.4]        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)[:1.8.0_121]        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)[:1.8.0_121]        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)[:1.8.0_121]        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)[:1.8.0_121]        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)[:1.8.0_121]        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)[:1.8.0_121]        at java.lang.Thread.run(Thread.java:745)[:1.8.0_121]

在網(wǎng)上查詢(xún)結果,原來(lái)是數據庫需要字符集設置為latin1.

 

于是,新建了一個(gè)數據庫,字符集設為L(cháng)atin1.然后啟動(dòng)ActiveMq,查看數據庫,發(fā)現多了三張表:

在這邊將結構導出來(lái),供手動(dòng)建表:

/*SQLyog v10.2 MySQL - 5.1.71 : Database - activemq**********************************************************************//*!40101 SET NAMES utf8 */;/*!40101 SET SQL_MODE=''*/;/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */;/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */;/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */;/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;CREATE DATABASE /*!32312 IF NOT EXISTS*/`activemq` /*!40100 DEFAULT CHARACTER SET latin1 COLLATE latin1_bin */;USE `activemq`;/*Table structure for table `ACTIVEMQ_ACKS` */DROP TABLE IF EXISTS `ACTIVEMQ_ACKS`;CREATE TABLE `ACTIVEMQ_ACKS` (  `CONTAINER` varchar(250) COLLATE latin1_bin NOT NULL,  `SUB_DEST` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `CLIENT_ID` varchar(250) COLLATE latin1_bin NOT NULL,  `SUB_NAME` varchar(250) COLLATE latin1_bin NOT NULL,  `SELECTOR` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `LAST_ACKED_ID` bigint(20) DEFAULT NULL,  `PRIORITY` bigint(20) NOT NULL DEFAULT '5',  `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL,  PRIMARY KEY (`CONTAINER`,`CLIENT_ID`,`SUB_NAME`,`PRIORITY`),  KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;/*Table structure for table `ACTIVEMQ_LOCK` */DROP TABLE IF EXISTS `ACTIVEMQ_LOCK`;CREATE TABLE `ACTIVEMQ_LOCK` (  `ID` bigint(20) NOT NULL,  `TIME` bigint(20) DEFAULT NULL,  `BROKER_NAME` varchar(250) COLLATE latin1_bin DEFAULT NULL,  PRIMARY KEY (`ID`)) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;/*Table structure for table `ACTIVEMQ_MSGS` */DROP TABLE IF EXISTS `ACTIVEMQ_MSGS`;CREATE TABLE `ACTIVEMQ_MSGS` (  `ID` bigint(20) NOT NULL,  `CONTAINER` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `MSGID_PROD` varchar(250) COLLATE latin1_bin DEFAULT NULL,  `MSGID_SEQ` bigint(20) DEFAULT NULL,  `EXPIRATION` bigint(20) DEFAULT NULL,  `MSG` longblob,  `PRIORITY` bigint(20) DEFAULT NULL,  `XID` varchar(250) COLLATE latin1_bin DEFAULT NULL,  PRIMARY KEY (`ID`),  KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),  KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),  KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),  KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),  KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)) ENGINE=MyISAM DEFAULT CHARSET=latin1 COLLATE=latin1_bin;/*!40101 SET SQL_MODE=@OLD_SQL_MODE */;/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */;/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */;/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */;

 

三、測試消息持久化

package com.ckl.activemq;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class HelloActiveMQ {    public static void main(String[] args) throws Exception {        HelloWorldProducer producer = new HelloWorldProducer();        HelloWorldConsumer consumer = new HelloWorldConsumer();        Thread threadProducer = new Thread(producer);        threadProducer.start();      //注釋掉消費者,不然的話(huà),馬上消費者馬上把消息消費了,來(lái)不及持久化//        Thread threadConsumer = new Thread(consumer);//        threadConsumer.start();    }    public static class HelloWorldProducer implements Runnable {        public void run() {            try {                // Create a ConnectionFactory                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.2.140:61616");                // Create a Connection                Connection connection = connectionFactory.createConnection();                connection.start();                // Create a Session                Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);                // Create the destination (Topic or Queue)                Destination destination = session.createQueue("DemoQueue");                // Create a MessageProducer from the Session to the Topic or Queue                MessageProducer producer = session.createProducer(destination);//                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);                producer.setDeliveryMode(DeliveryMode.PERSISTENT);//消息需要持久化                // Create a messages                String text = "Hello world! From: " + Thread.currentThread().getName() + " : " + this.hashCode();                TextMessage message = session.createTextMessage(text);                // Tell the producer to send the message                System.out.println("Sent message: " + message.hashCode() + " : " + Thread.currentThread().getName());                producer.send(message);                // Clean up                session.close();                connection.close();            } catch (Exception e) {                System.out.println("Caught: " + e);                e.printStackTrace();            }        }    }    public static class HelloWorldConsumer implements Runnable, ExceptionListener {        public void run() {            try {                // Create a ConnectionFactory                ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(                        "tcp://192.168.2.140:61616");                // Create a Connection                Connection connection = connectionFactory.createConnection();                connection.start();                connection.setExceptionListener(this);                // Create a Session                Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);                // Create the destination (Topic or Queue)                Destination destination = session.createQueue("DemoQueue");                // Create a MessageConsumer from the Session to the Topic or                // Queue                MessageConsumer consumer = session.createConsumer(destination);                // Wait for a message                Message message = consumer.receive(1000);                if (message instanceof TextMessage) {                    TextMessage textMessage = (TextMessage) message;                    String text = textMessage.getText();                    System.out.println("Received: " + text);                } else {                    System.out.println("Received: " + message);                }                consumer.close();                session.close();                connection.close();            } catch (Exception e) {                System.out.println("Caught: " + e);                e.printStackTrace();            }        }        public synchronized void onException(JMSException ex) {            System.out.println("JMS Exception occured.  Shutting down client.");        }    }}

運行后,效果如下:

 四、單獨啟動(dòng)消費者,查看數據庫

注釋掉上面的生產(chǎn)者,單獨啟動(dòng)消費者后,發(fā)現數據庫中的消息已被消費。

 五、備注

Although the JDBC Store does not offer the best performance, it makes fairly simple to create a simple Master-Slave robust broker setup. 
When a group of Active MQ brokers is configured to use a shared database, they’ll all try to connect and grab a lock in the lock table,
but only one will succeed and become the master. The remaining brokers will be slaves, and will be in a wait state, not accepting client
connections until the master fails.
The lock table, is ACTIVEMQ_LOCK , and it is used to ensure that only one Active MQ broker instance can access the database at one time.
If an Active MQ broker can’t grab the database lock, that broker won’t initialize fully, and will wait until the lock becomes free, or it’s shut down.

看到一段資料補充下,涉及到ActiveMq配置為主從模式時(shí),都會(huì )去嘗試連接該數據源,為了防止資源爭用出現的問(wèn)題,加了ACTIVEMQ_LOCK表。

 

本站僅提供存儲服務(wù),所有內容均由用戶(hù)發(fā)布,如發(fā)現有害或侵權內容,請點(diǎn)擊舉報。
打開(kāi)APP,閱讀全文并永久保存 查看更多類(lèi)似文章
猜你喜歡
類(lèi)似文章
[java]log4j寫(xiě)sql server數據庫日志的統一寫(xiě)法
Hibernate框架(一)
在應用程序中配Proxool連接池
Hive總結
java開(kāi)發(fā)之如何連接數據庫MySQL
mysql配置zerodatetimebehavior=converttonull什么意思
更多類(lèi)似文章 >>
生活服務(wù)
分享 收藏 導長(cháng)圖 關(guān)注 下載文章
綁定賬號成功
后續可登錄賬號暢享VIP特權!
如果VIP功能使用有故障,
可點(diǎn)擊這里聯(lián)系客服!

聯(lián)系客服

欧美性猛交XXXX免费看蜜桃,成人网18免费韩国,亚洲国产成人精品区综合,欧美日韩一区二区三区高清不卡,亚洲综合一区二区精品久久