I have 2 types of queues for each message type - 1. REQUEST_QUEUE - to send message 2. RESPONSE_QUEUE - message To get feedback.
So if there are 5 message types, then we have 10 queues.
As a customer, I want to send a message in all the requested queue and I want to hear the response line according to the co-relation ID. To publish a message that I am using the code below:
public static zero publishing message (string exchange, string routing, string reaction key, list & lt; string & gt; Message list) IOException, exception {ConnectionFactory factory = new ConnectionFactory (); Factory.setHost (Constants.rabbitMQServerProperties.HOST_NAME); Factory.setUsername (Constants.rabbitMQServerProperties.USER_NAME); Factory.setPassword (Constants.rabbitMQServerProperties.PASSWORD); Factory.setport (constants.rabbitMQServerProperties.PORT); Connection connection = factory.New connection (); Channel channel = connection.createChannel (); //System.out.println (channel.isOpen ()); //System.out.println(message); (String Message: Message List) {channel.basicPublish (Exchange, Routing, True, Message Protocol.PERSISTENT_BASIC, message.getBytes ()); } Channel.close (); Connection.close (); System.out.println ("The message has been successfully published! \ N \ n"); Connection con = factory.newConnection (); // Executioner Service Thread Expector = Exporter. Nfix and Thipple (5); // Response queue worker fast = listen to new workers (0, Thread Exclorator, Con. Content channel (), feedback); }
On the last line, once I send the message, I am listening to the message queue.
Worker. Java:
Public class employees default consumer {string name; Channel channel; String queue; Int processed; Executor Service Executor; Public worker (int prefetch, executives thrad exclater, channel c, string que) throws an exception {super (c); Channel = C; Q = q; Channel.basicQos (prefetch); Channel.basicConsume (queue, wrong, this); Executor = Thread Accelerator; } @ Override Public Wide Handle Delivery (String ConsumerTag, Envelope Envelope, AMQP.Bitic Properties Properties, Byte [] Body) IOException throws {string feedback = new string (body); String routing key = envelope.gateetting (); String contentType = properties.getContentType (); String correlation ID = properties .getCorrelationId (); System.out.println (Q + "response ::" + response); My problems are: -
While listening to the queue, I only need to select my messages which I have presented, I have a list of correlation IDs) and if this is my corrupt ID I do not need to send the message back in the response queue.
-
Once I get my message, I have to close my connection.
- If I am sending messages in one of my 5 queens, my approach to sending is; Is it fair to hear?
Thanks in advance!
No comments:
Post a Comment