1.安装:
下载下面的rpm包:
erlang-17.4-1.el6.x86_64.rpm
haproxy-1.5.18-1.el6.x86_64.rpm
openssl-1.0.1e-57.el6.x86_64.rpm
rabbitmq-server-3.6.3-1.noarch.rpm
socat-1.7.2.4-1.el6.rf.x86_64.rpm
在linux中创建一个.sh文件里面复制进去下面命令:
#!/bin/bash
mydir=`pwd`
cd /etc/yum.repo.d/
rm -rf *.repo
cd $mydir
yum -y install openssl-1.0.1e-57.el6.x86_64
yum -y install socat-1.7.2.4-1.el6.rf.x86_64.rpm
yum -y install erlang-17.4-1.el6.x86_64.rpm
yum -y install rabbitmq-server-3.6.3-1.noarch.rpm
rabbitmq-plugins enable rabbitmq_management
service rabbitmq-server restart
rabbitmqctl add_user admin admin
rabbitmqctl set_permissions admin ".*" ".*" ".*"
rabbitmqctl set_user_tags admin administrator
修改创建文件的执行文件:chmod 777 文件名
执行完成之后:
启动:service rabbitmq-server start
停止:service rabbitmq-server stop
开放端口号:5672(端口号)和15672(客户端连接的端口号)
启动rabbitmq错误:
FAILED - check /var/log/rabbitmq/startup_\{log, _err\}
解决:
查看一下自己的主机名:hostname
改成127.0.0.1 localhost
vim /etc/hosts
2.和java做整合:
首先在spring的xml中的头文件中配置rabbit的头文件信息
xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd
然后就可以配置和rabbit连接的信息,这里定义的是直连虚拟机,就是点对点的
<!-- 配置connnection-factory,指定连接rabbit server 的参数 --> <rabbit:connection-factory id="connectionFactory" host="192.168.133.131" username="admin" password="admin" port="5672" virtual-host="1601"/> <!-- 通过指定下面的admin信息,当前provider中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 声明队列 --> <rabbit:queue auto-delete="true" durable="false" exclusive="false" name="queue_1" /> <!-- 声明direct exchange路由交换机,绑定队列,并且不持久化 --> <rabbit:direct-exchange name="dct_ex" auto-delete="true" durable="false"> <rabbit:bindings> <!-- 绑定队列与ROUTING_KEY --> <rabbit:binding queue="queue_1" key="rtkey" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 定义rabbit template 用于数据的接收和发送 --> <!-- 绑定关系 --> <rabbit:template id="amqTemplate" connection-factory="connectionFactory" exchange="dct_ex" routing-key="rtkey" />
自定义监听queue的类
<!-- queue litener 观察 监听模式 当有消息到达时会通知监听在对应队列上的监听对象 --> <rabbit:listener-container acknowledge="auto" connection-factory="connectionFactory"> <rabbit:listener queue-names="queue_1" ref="consumer" method="listen" /> </rabbit:listener-container> <bean id="consumer" class="www.lj.com.rabbitmq.Consumer" />
类的信息:
public class Consumer { @Resource private TvMapper tvMapper; //2.定义数据操作对象 private ObjectMapper objectMapper = new ObjectMapper(); //1 定义json转化的对象 //具体执行业务的方法 public void listen(String json) { System.out.println("从队列中取出:"+json); //3.转化成对象 try { T_tv entity = objectMapper.readValue(json, T_tv.class); //4.执行保存 tvMapper.insertMiddleTable(entity.getId(), entity.getTidss()); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
测试运行:
@Resource private RabbitTemplate tabbitTemplate; //把对象转换json格式的字符串 private ObjectMapper objectMapper = new ObjectMapper(); @Override public void insertObject(T_tv tv) throws Exception { tvMapper.insertObject(tv); String string = objectMapper.writeValueAsString(tv); tabbitTemplate.convertAndSend(string); }
然后在listen方法中就会从queue队列中取出效果如下:
3.配置定时任务:
需要在springxml中的头文件中配置:
xmlns:task="http://www.springframework.org/schema/task" xmlns:context="http://www.springframework.org/schema/context" http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.3.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
下面的配置和之前的一样,多出两句一个是扫描定时任务的包,一个是启动定时任务的注解
<!-- 配置connnection-factory,指定连接rabbit server 的参数 --> <rabbit:connection-factory id="connectionFactory" host="192.168.133.131" username="admin" password="admin" port="5672" virtual-host="1601I"/> <!-- 通过指定下面的admin信息,当前provider中的exchange和queue会在rabbitmq服务器上自动生成 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- 声明队列 --> <rabbit:queue auto-delete="true" durable="false" exclusive="false" name="queue_1" /> <!-- 声明direct exchange路由交换机,绑定队列,并且持久化 --> <rabbit:direct-exchange name="dct_ex" auto-delete="true" durable="false"> <rabbit:bindings> <!-- 绑定队列与ROUTING_KEY --> <rabbit:binding queue="queue_1" key="rtkey" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- 定义rabbit template 用于数据的接收和发送 --> <!-- 绑定关系 --> <rabbit:template id="amqTemplate" connection-factory="connectionFactory" exchange="dct_ex" routing-key="rtkey" /> <!-- 配置定时任务扫描的包 --> <context:component-scan base-package="www.lj.com.rabbitmq"/> <!-- 启动定时任务的注解 --> <task:annotation-driven/>
下面是扫描包下的类:
@Component public class ConsumerTask { // * * * * * * //第一个* :多少秒执行一次 如果为*则为每秒执行一次 //第二个* :多少分钟执行一次 //第三个* :多少小时执行一次 //第四个* :一个月中的第几天执行一次 //第五个* :第几个月执行一次 //第六个* :一周中的星期几执行一次通常情况下用'?' //?只能用于第四个和第六个其中一个,因为两个会冲突 //-表示范围 //'/'表示每隔多长时间执行一次 //'*'表示任意值 @Resource private RabbitTemplate rabbitTemplate; @Resource private TvMapper tvMapper; private ObjectMapper objectMapper = new ObjectMapper(); @Scheduled(cron="*/2 * * * * *") public void regularTask() throws Exception{ System.out.println(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date(System.currentTimeMillis()))); //接收到的消息 Message receive = rabbitTemplate.receive("queue_1"); if(receive!=null){ //获取收到的数据 byte[] body = receive.getBody(); //将获取到的字节数组转化为String类型的字符串 String json = new String(body); T_tv t_tv = objectMapper.readValue(json, T_tv.class); tvMapper.insertMiddleTable(t_tv.getId(),t_tv.getTidss()); }else{ System.out.println("队列中无数据......"); } } }
测试运行:
类里配置的2秒去queue队列中获取一次得到以下结果:
改成30秒后添加两条数据:
将时间改为2秒执行1次效果如下: