Commit 11b022d4 authored by David Stainton's avatar David Stainton

Fix BlockingSendUnreliableMessage memory leaks

parent 1dc6e4ba
Pipeline #988 passed with stage
in 7 minutes and 22 seconds
......@@ -176,23 +176,26 @@ func (s *Session) BlockingSendUnreliableMessage(recipient, provider string, mess
if err != nil {
return nil, err
}
sentWaitChan := make(chan *Message)
s.sentWaitChanMap.Store(*msg.ID, sentWaitChan)
defer s.sentWaitChanMap.Delete(*msg.ID)
replyWaitChan := make(chan []byte)
s.replyWaitChanMap.Store(*msg.ID, replyWaitChan)
defer s.replyWaitChanMap.Delete(*msg.ID)
err = s.egressQueue.Push(msg)
if err != nil {
return nil, err
}
// wait until sent so that we know the ReplyETA for the waiting below
sentMessage := <-sentWaitChan
s.sentWaitChanMap.Delete(*msg.ID)
// wait for reply or round trip timeout
select {
case reply := <-replyWaitChan:
s.replyWaitChanMap.Delete(*msg.ID)
return reply, nil
case <-time.After(sentMessage.ReplyETA + roundTripTimeSlop):
return nil, ReplyTimeoutError
......
......@@ -264,12 +264,12 @@ func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte
case cConstants.SurbTypeKaetzchen, cConstants.SurbTypeInternal:
if msg.IsBlocking {
replyWaitChanRaw, ok := s.replyWaitChanMap.Load(*msg.ID)
replyWaitChan := replyWaitChanRaw.(chan []byte)
if !ok {
err := fmt.Errorf("Impossible failure to acquire replyWaitChan for message ID %x", msg.ID)
err := fmt.Errorf("BUG, failure to acquire replyWaitChan for message ID %x", msg.ID)
s.fatalErrCh <- err
return err
}
replyWaitChan := replyWaitChanRaw.(chan []byte)
replyWaitChan <- plaintext[2:]
} else {
s.eventCh.In() <- &MessageReplyEvent{
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment