@@ -14,11 +14,23 @@ use tokio::sync::mpsc;
14
14
pub struct LuaChannel {
15
15
name : String ,
16
16
id : i64 ,
17
+ pub receiver : Arc < Mutex < mpsc:: Receiver < i64 > > > ,
18
+ pub sender : Arc < Mutex < mpsc:: Sender < i64 > > > ,
17
19
}
18
20
19
21
impl LuaChannel {
20
- fn new ( name : String , id : i64 ) -> LuaChannel {
21
- LuaChannel { name, id }
22
+ fn new (
23
+ name : String ,
24
+ id : i64 ,
25
+ receiver : Arc < Mutex < mpsc:: Receiver < i64 > > > ,
26
+ sender : Arc < Mutex < mpsc:: Sender < i64 > > > ,
27
+ ) -> LuaChannel {
28
+ LuaChannel {
29
+ name,
30
+ id,
31
+ receiver,
32
+ sender,
33
+ }
22
34
}
23
35
24
36
pub fn mt_string ( & self ) -> String {
@@ -28,17 +40,13 @@ impl LuaChannel {
28
40
29
41
pub struct LuaChannelMgr {
30
42
channels : HashMap < String , LuaChannel > ,
31
- receivers : HashMap < i64 , Arc < Mutex < mpsc:: Receiver < i64 > > > > ,
32
- senders : HashMap < i64 , Arc < Mutex < mpsc:: Sender < i64 > > > > ,
33
43
id_counter : i64 ,
34
44
}
35
45
36
46
impl LuaChannelMgr {
37
47
pub fn new ( ) -> LuaChannelMgr {
38
48
LuaChannelMgr {
39
49
channels : HashMap :: new ( ) ,
40
- receivers : HashMap :: new ( ) ,
41
- senders : HashMap :: new ( ) ,
42
50
id_counter : 0 ,
43
51
}
44
52
}
@@ -47,23 +55,18 @@ impl LuaChannelMgr {
47
55
let ( sender, receiver) = mpsc:: channel ( 100 ) ;
48
56
let id = self . id_counter ;
49
57
self . id_counter += 1 ;
50
- let channel = LuaChannel :: new ( name. clone ( ) , id) ;
58
+ let channel = LuaChannel :: new (
59
+ name. clone ( ) ,
60
+ id,
61
+ Arc :: new ( Mutex :: new ( receiver) ) ,
62
+ Arc :: new ( Mutex :: new ( sender) ) ,
63
+ ) ;
51
64
self . channels . insert ( name. clone ( ) , channel) ;
52
- self . receivers . insert ( id, Arc :: new ( Mutex :: new ( receiver) ) ) ;
53
- self . senders . insert ( id, Arc :: new ( Mutex :: new ( sender) ) ) ;
54
65
}
55
66
56
67
pub fn get_channel ( & self , name : & str ) -> Option < LuaChannel > {
57
68
self . channels . get ( name) . cloned ( )
58
69
}
59
-
60
- pub fn get_sender ( & self , id : i64 ) -> Option < Arc < Mutex < mpsc:: Sender < i64 > > > > {
61
- self . senders . get ( & id) . cloned ( )
62
- }
63
-
64
- pub fn get_receiver ( & self , id : i64 ) -> Option < Arc < Mutex < mpsc:: Receiver < i64 > > > > {
65
- self . receivers . get ( & id) . cloned ( )
66
- }
67
70
}
68
71
69
72
lazy_static ! {
@@ -77,28 +80,20 @@ impl UserData for LuaChannel {
77
80
} ) ;
78
81
79
82
methods. add_async_method ( "push" , |lua, this, args : mlua:: MultiValue | async move {
80
- let id = this. id ;
81
83
let lua_seri_pack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_pack" ) ?;
82
84
let ptr = lua_seri_pack. call :: < i64 > ( args) . unwrap ( ) ;
83
- let opt_sender = { ChannelMgr . lock ( ) . unwrap ( ) . get_sender ( id) } ;
84
- if let Some ( sender) = opt_sender {
85
- let sender = sender. lock ( ) . unwrap ( ) ;
86
- sender. send ( ptr) . await . unwrap ( ) ;
87
- }
85
+ let sender = this. sender . lock ( ) . unwrap ( ) ;
86
+ sender. send ( ptr) . await . unwrap ( ) ;
88
87
Ok ( ( ) )
89
88
} ) ;
90
89
91
90
methods. add_method ( "pop" , |lua, this, ( ) | {
92
- let id = this. id ;
93
- let opt_receiver = { ChannelMgr . lock ( ) . unwrap ( ) . get_receiver ( id) } ;
94
- if let Some ( receiver) = opt_receiver {
95
- let data = receiver. lock ( ) . unwrap ( ) . try_recv ( ) ;
96
- if let Ok ( data) = data {
97
- let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
98
- let mut returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
99
- returns. insert ( 0 , mlua:: Value :: Boolean ( true ) ) ;
100
- return Ok ( returns) ;
101
- }
91
+ let data = this. receiver . lock ( ) . unwrap ( ) . try_recv ( ) ;
92
+ if let Ok ( data) = data {
93
+ let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
94
+ let mut returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
95
+ returns. insert ( 0 , mlua:: Value :: Boolean ( true ) ) ;
96
+ return Ok ( returns) ;
102
97
}
103
98
104
99
let mut returns = mlua:: MultiValue :: new ( ) ;
@@ -107,15 +102,11 @@ impl UserData for LuaChannel {
107
102
} ) ;
108
103
109
104
methods. add_async_method ( "bpop" , |lua, this, ( ) | async move {
110
- let id = this. id ;
111
- let opt_receiver = { ChannelMgr . lock ( ) . unwrap ( ) . get_receiver ( id) } ;
112
- if let Some ( receiver) = opt_receiver {
113
- let data = receiver. lock ( ) . unwrap ( ) . recv ( ) . await ;
114
- if let Some ( data) = data {
115
- let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
116
- let returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
117
- return Ok ( returns) ;
118
- }
105
+ let data = { this. receiver . lock ( ) . unwrap ( ) . recv ( ) . await } ;
106
+ if let Some ( data) = data {
107
+ let lua_seri_unpack = lua. globals ( ) . get :: < LuaFunction > ( "lua_seri_unpack" ) ?;
108
+ let returns = lua_seri_unpack. call :: < mlua:: MultiValue > ( data) . unwrap ( ) ;
109
+ return Ok ( returns) ;
119
110
}
120
111
121
112
let returns = mlua:: MultiValue :: new ( ) ;
0 commit comments