1.maven 安装
wget http://repos.fedorapeople.org/repos/dchen/apache-maven/epel-apache-maven.repo -O /etc/yum.repos.d/epel-apache-maven.repo
yum -y install apache-maven
2.install
yum -y install ant 3.git 安装
yum install git
#查看版本
git --version
#显示 git version +版本号 表示成功
#配置 git 名称以及邮箱
git config --global user.name "Your Name"
git config --global user.email "user@youremail"
3.rocketmq安装(注意这里版本是4.2.0)

cd /usr/local/rocketmq(没有则创建目录)
git clone -b develop https://github.com/apache/incubator-rocketmq.git
cd incubator-rocketmq
mvn -Prelease-all -DskipTests clean install -U
------------------

一段长时间的maven 依赖下载

-------------------
cd distribution/target/apache-rocketmq

2)配置文件
vim /etc/profile 添加

centos7 安装 maven 和ant git 以及 rocketmq 4.2安装过程(安装成功,调用失败)-LMLPHP

#apache rocket-mq
export ROCKETMQ_HOME=/usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq
export PATH=$JAVA_HOME/bin:$ROCKETMQ_HOME/bin:$PATH
export NAMESRV_ADDR=自己服务器ip:9876

使profile 生效

source /etc/profile

进入到 /usr/local/rocketmq/incubator-rocketmq/distribution/target/apache-rocketmq/bin 目录下:添加权限

chmod +x mqadmin mqbroker mqfiltersrv mqshutdown mqnamesrv

启动:

nohup  mqnamesrv &

 //查看启动日志 默认在bin 目录的nohup.log下
tail -f nohup.out显示如下信息 表示启动成功

centos7 安装 maven 和ant git 以及 rocketmq 4.2安装过程(安装成功,调用失败)-LMLPHP

3)由于自己的服务器使用的是阿里云的 2g 内存,启动那个 mqnamesrv后,启动mqbroker时候需要设置下内存大小,否则会报错

vim  runserver.sh(因为mqbroker脚本里面调用了runserver.sh  
sh ${ROCKETMQ_HOME}/bin/runbroker.sh org.apache.rocketmq.broker.BrokerStartup $@) 修改下图:

centos7 安装 maven 和ant git 以及 rocketmq 4.2安装过程(安装成功,调用失败)-LMLPHP

之后 启动 mqbroker并将启动日志写入到指定位置.进入到target/bin目录

 nohup mqbroker & >/var/log/mq.log

启动成功后,使用ps aux|grep rocketmq如下图

centos7 安装 maven 和ant git 以及 rocketmq 4.2安装过程(安装成功,调用失败)-LMLPHP

4)写测试用例

1.pom.xml引入rocket包(引入的是4.1的包,4.2的引入后无法使用)

<!--4.2无法使用 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-all</artifactId>
<version>4.2.0</version>
<type>pom</type>
</dependency>
<!--4.1的引用包-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.1.0-incubating</version>
</dependency> 生产者main方法:
package cn.rocketmq;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException; /**
* Create by fan on 2018/4/16
*/
public class TestProductRocketMq {
public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException { final DefaultMQProducer defaultMQProducer = new DefaultMQProducer("producerGroupName");
// defaultMQProducer.setVipChannelEnabled(false);
defaultMQProducer.setNamesrvAddr("47.98.111.19:9876"); try {
defaultMQProducer.start();
} catch (MQClientException e) {
e.printStackTrace();
} Message message = new Message("testTopic","tagA","keyA","Hello RocketMq".getBytes());
for (int i = 0 ;i<100;i++){
if (i%2 == 0){
SendResult sendResult = defaultMQProducer.send(message);
Thread.sleep(100);
System.out.println("tags send result:" + sendResult);
}else {
message = new Message("testTopic","tagB","keyB","Hello RocketMq.I'm your user".getBytes());
SendResult sendResult = defaultMQProducer.send(message);
Thread.sleep(100);
System.out.println("tags send result:" + sendResult);
}
} Runtime.getRuntime().addShutdownHook(new Thread(() -> defaultMQProducer.shutdown()));
System.exit(0);
}
}

消费者方法:

package cn.rocketmq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /**
* Create by fan on 2018/4/16
*/
public class TestConsumeRocketMq {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("testProducerGroupName");
defaultMQPushConsumer.setNamesrvAddr("47.98.111.19:9876"); defaultMQPushConsumer.subscribe("testTopic","tagA || tagB");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() { @Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.println(Thread.currentThread().getName() + "Receive new message:" + list);
MessageExt messageExt = list.get(0);
System.out.println("messageExt:" + messageExt);
if(messageExt!=null && "testTopic".equals(messageExt.getTopic())){ if("tagA".equals(messageExt.getTags())){ String mess = new String(messageExt.getBody());
System.out.println("mess tagA consume:" + mess);
}else if("tagB".equals(messageExt.getTags())){
String mess = new String(messageExt.getBody());
System.out.println("mess tagB consume:" + mess);
}
}
//回执确认消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
defaultMQPushConsumer.start(); System.out.println("Consume start."); }
}

之后运行main方法报错:

RocketMq Exception "connect to <:10909> failed"

解决办法是:centos关闭了10909以及9876防火墙端口

firewall-cmd --zone=public --add-port=10909/tcp --permanent

firewall-cmd --zone=public --add-port=9876/tcp --permanent

 
之后,又碰到下面的问题:
 centos7 安装 maven 和ant git 以及 rocketmq 4.2安装过程(安装成功,调用失败)-LMLPHP

google了半天,尚未解决。。。

 

  

05-11 16:23