-
Notifications
You must be signed in to change notification settings - Fork 881
Expand file tree
/
Copy pathRabbitMQ.js
More file actions
79 lines (73 loc) · 2.34 KB
/
RabbitMQ.js
File metadata and controls
79 lines (73 loc) · 2.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
require('dotenv').config() // dotenv autoload
let amqp = require('amqplib');
const { getenv } = require("../util/utils")
module.exports = class RabbitMQ {
constructor() {
this.hosts = !!getenv('AMQP_HOST')? getenv('AMQP_HOST').split(',') : ["amqp://localhost"];
this.index = 0;
this.length = this.hosts.length;
this.open = amqp.connect(this.hosts[this.index]);
}
sendQueueMsg(queueName, msg, successCallback, errorCallBack) {
let self = this;
self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName).then(function (ok) {
return channel.sendToQueue(queueName, new Buffer.from(msg), {
persistent: true
});
}).then(function (data) {
if (data) {
typeof successCallback === "function" && successCallback("success");
channel.close();
}
}).catch(function (e) {
errorCallBack ** errorCallBack(e)
setTimeout(() => {
if (channel) {
channel.close();
}
}, 500)
});
})
.catch(function (e) {
typeof errorCallBack === "function" && errorCallBack(e)
});
}
receiveQueueMsg(queueName, receiveCallBack, errCallBack) {
let self = this;
self.open
.then(function (conn) {
return conn.createChannel();
})
.then(function (channel) {
return channel.assertQueue(queueName).then(function (ok) {
channel.prefetch(10, false);
return channel.consume(queueName, function (msg) {
if (msg !== null) {
let data = msg.content.toString();
receiveCallBack && receiveCallBack(data, function() {});
channel.ack(msg);
}
}).finally(function () { });
})
})
.catch(function (e) {
errCallBack(e)
/**
* 下面的逻辑是做容灾处理,会有多个rabbitmq服务用来切换
* @type {number}
let num = self.index++;
if (num <= self.length - 1) {
self.open = amqp.connect(self.hosts[num]);
} else {
self.index = 0;
self.open = amqp.connect(self.hosts[0]);
}
*/
});
}
}