@@ -4,6 +4,8 @@ import { User } from '../user/user.model';
4
4
import { Thread } from '../thread/thread.model' ;
5
5
import { Message } from '../message/message.model' ;
6
6
7
+ import * as _ from 'lodash' ;
8
+
7
9
const initialMessages : Message [ ] = [ ] ;
8
10
9
11
interface IMessagesOperation extends Function {
@@ -26,6 +28,8 @@ export class MessagesService {
26
28
// action streams
27
29
create : Subject < Message > = new Subject < Message > ( ) ;
28
30
markThreadAsRead : Subject < any > = new Subject < any > ( ) ;
31
+ remove : Subject < Message > = new Subject < Message > ( ) ;
32
+ removeThreadMessages : Subject < any > = new Subject < any > ( ) ;
29
33
30
34
constructor ( ) {
31
35
this . messages = this . updates
@@ -84,13 +88,41 @@ export class MessagesService {
84
88
} )
85
89
. subscribe ( this . updates ) ;
86
90
91
+ // Remove all messages from particular thread
92
+ this . removeThreadMessages
93
+ . map ( ( thread : Thread ) => {
94
+ return ( messages : any ) => {
95
+ _ . remove ( messages , ( el : any ) => {
96
+ return el . id === thread . id
97
+ } ) ;
98
+ return messages ;
99
+ } ;
100
+ } )
101
+ . subscribe ( this . updates ) ;
102
+
103
+ // Remove all messages from message stream
104
+ this . remove
105
+ . map ( function ( ) : IMessagesOperation {
106
+ return ( messages : Message [ ] ) => {
107
+ return messages = [ ] ;
108
+ } ;
109
+ } )
110
+ . subscribe ( this . updates ) ;
87
111
}
88
112
89
113
// an imperative function call to this action stream
90
114
addMessage ( message : Message ) : void {
91
115
this . newMessages . next ( message ) ;
92
116
}
93
117
118
+ removeAllMessages ( ) {
119
+ this . remove . next ( ) ;
120
+ }
121
+
122
+ removeAllThreadMessages ( thread : Thread ) {
123
+ this . removeThreadMessages . next ( thread ) ;
124
+ }
125
+
94
126
messagesForThreadUser ( thread : Thread , user : User ) : Observable < Message > {
95
127
return this . newMessages
96
128
. filter ( ( message : Message ) => {
0 commit comments