Spring Cloud Stream的使用(下)

<project xmlns="http://maven.apache.org/POM/4.0.0" 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.learn</groupId>
  <artifactId>order</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>
  
  <parent>
    <groupId>cn.learn</groupId>
    <artifactId>microcloud02</artifactId>
    <version>0.0.1</version>
  </parent>
	
	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
		<thymeleaf.version>3.0.9.RELEASE</thymeleaf.version>
		<thymeleaf-layout-dialect.version>2.2.2</thymeleaf-layout-dialect.version>
	</properties>
	
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-amqp</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
		<dependency>
		  <groupId>org.projectlombok</groupId>
		  <artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<dependency>
		    <groupId>org.springframework.cloud</groupId>
		    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
	</dependencies>
	
	<!-- 这个插件,可以将应用打包成一个可执行的jar包 -->
	<build>
	    <plugins>
	        <plugin>
	            <groupId>org.springframework.boot</groupId>
	            <artifactId>spring-boot-maven-plugin</artifactId>
	        </plugin>
	    </plugins>
	</build>
  
  
</project>
server.port=8010

eureka.client.serviceUrl.defaultZone=http://admin:1234@10.40.8.152:8761/eureka

spring.application.name=order
eureka.instance.prefer-ip-address=true
eureka.instance.instance-id=${spring.application.name}:${spring.cloud.client.ipAddress}:${spring.application.instance_id:${server.port}}

spring.rabbitmq.host=59.110.158.145
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

spring.cloud.stream.bindings.myMessage.group=order
spring.cloud.stream.bindings.myMessage.content-type=application/json
package com.learn.message;

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface StreamClient {
    // 报错:Invalid bean definition with name 'myMessageOrdersssss' 
	// defined in com.imooc.order.message.StreamClient: bean definition with this name already exists
    //解决方法:@Input和@Output不可一样,同一服务里面的信道名字不能一样,在不同的服务里可以相同名字的信道
    String INPUT = "myMessage1";
    String INPUT2 = "myMessage2";
    
    /**
     * 接收消息、入口
     * @return
     */
    @Input(StreamClient.INPUT)
    SubscribableChannel input();    
    /**
     * 发送消息
     * @return
     */
    @Output(StreamClient.INPUT)
    MessageChannel output();
    
    /**
     * 接收消息、入口
     * @return
     */
    @Input(StreamClient.INPUT2)
    SubscribableChannel input2();    
    /**
     * 发送消息
     * @return
     */
    @Output(StreamClient.INPUT2)
    MessageChannel output2();
 
}
package com.learn.message;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Component;

import com.learn.dto.OrderDTO;

import lombok.extern.slf4j.Slf4j;

/**
 * 消息接收者
 * @author Leon.Sun
 *
 */
@Component
@EnableBinding(StreamClient.class)
@Slf4j
public class StreamReceiver {
	
	private  final Logger log = LoggerFactory.getLogger(StreamReceiver.class);
	
//    @StreamListener(StreamClient.OUTPUT)
//    public void process(Object message){
//        System.out.println(message);
//        log.info("StreamReceiver:{}",message);
//    }
 
    /**
     * 接收orderDTO对象消息
     * */
//	@StreamListener(value = StreamClient.OUTPUT)
//    public void process(OrderDTO message) {
//        log.info("message : {}", message);
//    }
	
	@StreamListener(StreamClient.INPUT)
    @SendTo(StreamClient.INPUT2)
    public String process(OrderDTO message) {
        log.info("message : {}", message);

        return "success";
    }

	
	@StreamListener(StreamClient.INPUT2)
    public void success(String message) {
        log.info("message : {}", message);
    }
 
}
package com.learn.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import com.learn.dto.OrderDTO;
import com.learn.message.StreamClient;

@RestController
public class SendMessageController {
    @Autowired
    private StreamClient streamClient;
 
//    @GetMapping("/sendMessage")
//    public void process(){
//        String message = "now "+new Date();
//        streamClient.output().send(MessageBuilder.withPayload(message).build());
//    }
 
//    @GetMapping("/sendMessage")
//    public void process(){
//        OrderDTO orderDTO = new OrderDTO();
//        orderDTO.setOrderId("123456");
//        orderDTO.setOrderStatus(1);
//        orderDTO.setBuyerAddress("SZ");
//        orderDTO.setBuyerPhone("188");
//        for(int i=0;i<10;i++) {
//        	orderDTO.setBuyerName("=====第"+i+"条数据=====:");
//        	MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO);
            streamClient.output().send(messageBuilder.build());
//            streamClient.input().send(messageBuilder.build());
//        }
//    }
    
    @GetMapping("/sendMessage")
    public void send() {
        OrderDTO orderDTO = new OrderDTO();
        orderDTO.setOrderId("123465");
        MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO);
        streamClient.output().send(messageBuilder.build());
    }
}
package com.learn;

import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

/**
 * 自动配置
 * @author Leon.Sun
 *
 */
@SpringBootApplication
@EnableRabbit
@EnableEurekaClient
public class OrderAmqpApplication {

	public static void main(String[] args) {
		// Spring应用启动起来
		SpringApplication.run(OrderAmqpApplication.class,args);
	}
	
}
https://blog.51cto.com/zero01/2173288

spring.cloud.stream.bindings.myMessageOutput.content-type=application/json