Commit fa1488e5 authored by David Stainton's avatar David Stainton

Add untested BlockingSendUnreliableMessage

parent 74f9aaa5
......@@ -81,12 +81,13 @@ func (s *Session) doSend(msg *Message) error {
s.surbIDMap.Store(surbID, msg)
}
if msg.IsBlocking {
val, ok := s.messageSentWaitChanMap.Load(*msg.ID)
sentWaitChanRaw, ok := s.sentWaitChanMap.Load(*msg.ID)
if !ok {
err := fmt.Errorf("Impossible failure, sentWaitChan not found for message ID %x", *msg.ID)
s.log.Error(err.Error())
s.fatalErrCh <- err
}
sentWaitChan := sentWaitChanRaw.(chan *Message)
sentWaitChan <- msg
} else {
s.eventCh.In() <- &MessageSentEvent{
......@@ -177,7 +178,6 @@ func (s *Session) SendUnreliableMessage(recipient, provider string, message []by
if err != nil {
return nil, err
}
s.messageIDMap.Store(*msg.ID, msg)
err = s.egressQueue.Push(msg)
if err != nil {
return nil, err
......@@ -190,24 +190,24 @@ func (s *Session) BlockingSendUnreliableMessage(recipient, provider string, mess
if err != nil {
return nil, err
}
s.messageIDMap.Store(*msg.ID, msg)
sentWaitChan := make(chan interface{})
s.messageSentWaitChanMap.Store(*msg.ID, sentWaitChan)
s.sentWaitChanMap.Store(*msg.ID, sentWaitChan)
replyWaitChan := make(chan []byte)
s.replyWaitChanMap.Store(*msg.ID, replyWaitChan)
err = s.egressQueue.Push(msg)
if err != nil {
return nil, err
}
// wait until sent so that we know the ReplyETA for the waiting below
sentMessage := <-sentWaitChan
sentMessageRaw := <-sentWaitChan
sentMessage := sentMessageRaw.(*Message)
s.sentWaitChanMap.Delete(*msg.ID)
// wait for reply or round trip timeout
replyWaitChan, ok := s.replyWaitChanMap.Load()
if !ok {
err := fmt.Errorf("Impossible failure, replyWaitChan not found for message ID %x", *msg.ID)
s.log.Error(err.Error())
s.fatalErrCh <- err
}
select {
case reply := <-replyWaitChan:
s.replyWaitChanMap.Delete(*msg.ID)
return reply, nil
case <-time.After(sentMessage.ReplyETA + roundTripTimeSlop):
return nil, ReplyTimeoutError
......
......@@ -71,9 +71,9 @@ type Session struct {
egressQueue EgressQueue
surbIDMap sync.Map // [sConstants.SURBIDLength]byte -> *Message
messageIDMap sync.Map // [cConstants.MessageIDLength]byte -> *Message
awaitReplyMap sync.Map // MessageID -> bool
surbIDMap sync.Map // [sConstants.SURBIDLength]byte -> *Message
sentWaitChanMap sync.Map // MessageID -> chan *Message
replyWaitChanMap sync.Map // MessageID -> chan []byte
decoyLoopTally uint64
}
......@@ -237,6 +237,7 @@ func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte
s.log.Debug("Strange, received reply with unexpected SURBID")
return nil
}
s.surbIDMap.Delete(*surbID)
msg := rawMessage.(*Message)
plaintext, err := sphinx.DecryptSURBPayload(ciphertext, msg.Key)
if err != nil {
......@@ -249,13 +250,19 @@ func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte
}
if msg.WithSURB && msg.IsDecoy {
s.decrementDecoyLoopTally()
s.surbIDMap.Delete(*surbID)
return nil
}
switch msg.SURBType {
case cConstants.SurbTypeKaetzchen, cConstants.SurbTypeInternal:
if msg.IsBlocking {
// XXX fix me
replyWaitChanRaw, ok := s.replyWaitChanMap.Load(*msg.ID)
replyWaitChan := replyWaitChanRaw.(chan []byte)
if !ok {
err := fmt.Errorf("Impossible failure to acquire replyWaitChan for message ID %x", msg.ID)
s.fatalErrCh <- err
return err
}
replyWaitChan <- plaintext[2:]
} else {
s.eventCh.In() <- &MessageReplyEvent{
MessageID: msg.ID,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment