<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.learn</groupId>
<artifactId>order</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<parent>
<groupId>cn.learn</groupId>
<artifactId>microcloud02</artifactId>
<version>0.0.1</version>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<thymeleaf.version>3.0.9.RELEASE</thymeleaf.version>
<thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-eureka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
</dependencies>
<!-- 这个插件,可以将应用打包成一个可执行的jar包 -->
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
server.port=8010
eureka.client.serviceUrl.defaultZone=http://admin:1234@10.40.8.152:8761/eureka
spring.application.name=order
eureka.instance.prefer-ip-address=true
eureka.instance.instance-id=${spring.application.name}:${spring.cloud.client.ipAddress}:${spring.application.instance_id:${server.port}}
spring.rabbitmq.host=59.110.158.145
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672
spring.cloud.stream.bindings.myMessage.group=order
spring.cloud.stream.bindings.myMessage.content-type=application/json
package com.learn.message;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface StreamClient {
// 报错:Invalid bean definition with name 'myMessageOrdersssss'
// defined in com.imooc.order.message.StreamClient: bean definition with this name already exists
//解决方法:@Input和@Output不可一样,同一服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
String INPUT = "myMessage1";
String INPUT2 = "myMessage2";
/**
* 接收消息、入口
* @return
*/
@Input(StreamClient.INPUT)
SubscribableChannel input();
/**
* 发送消息
* @return
*/
@Output(StreamClient.INPUT)
MessageChannel output();
/**
* 接收消息、入口
* @return
*/
@Input(StreamClient.INPUT2)
SubscribableChannel input2();
/**
* 发送消息
* @return
*/
@Output(StreamClient.INPUT2)
MessageChannel output2();
}
package com.learn.message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;
import com.learn.dto.OrderDTO;
import lombok.extern.slf4j.Slf4j;
/**
* 消息接收者
* @author Leon.Sun
*
*/
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
private final Logger log = LoggerFactory.getLogger(StreamReceiver.class);
// @StreamListener(StreamClient.OUTPUT)
// public void process(Object message){
// System.out.println(message);
// log.info("StreamReceiver:{}",message);
// }
/**
* 接收orderDTO对象消息
* */
// @StreamListener(value = StreamClient.OUTPUT)
// public void process(OrderDTO message) {
// log.info("message : {}", message);
// }
@StreamListener(StreamClient.INPUT)
@SendTo(StreamClient.INPUT2)
public String process(OrderDTO message) {
log.info("message : {}", message);
return "success";
}
@StreamListener(StreamClient.INPUT2)
public void success(String message) {
log.info("message : {}", message);
}
}
package com.learn.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.learn.dto.OrderDTO;
import com.learn.message.StreamClient;
@RestController
public class SendMessageController {
@Autowired
private StreamClient streamClient;
// @GetMapping("/sendMessage")
// public void process(){
// String message = "now "+new Date();
// streamClient.output().send(MessageBuilder.withPayload(message).build());
// }
// @GetMapping("/sendMessage")
// public void process(){
// OrderDTO orderDTO = new OrderDTO();
// orderDTO.setOrderId("123456");
// orderDTO.setOrderStatus(1);
// orderDTO.setBuyerAddress("SZ");
// orderDTO.setBuyerPhone("188");
// for(int i=0;i<10;i++) {
// orderDTO.setBuyerName("=====第"+i+"条数据=====:");
// MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO);
streamClient.output().send(messageBuilder.build());
// streamClient.input().send(messageBuilder.build());
// }
// }
@GetMapping("/sendMessage")
public void send() {
OrderDTO orderDTO = new OrderDTO();
orderDTO.setOrderId("123465");
MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO);
streamClient.output().send(messageBuilder.build());
}
}
package com.learn;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
/**
* 自动配置
* @author Leon.Sun
*
*/
@SpringBootApplication
@EnableRabbit
@EnableEurekaClient
public class OrderAmqpApplication {
public static void main(String[] args) {
// Spring应用启动起来
SpringApplication.run(OrderAmqpApplication.class,args);
}
}
https://blog.51cto.com/zero01/2173288
spring.cloud.stream.bindings.myMessageOutput.content-type=application/json