Commit 74f9aaa5 authored by David Stainton's avatar David Stainton

WIP async and blocking client sends

parent f8cd7a30
......@@ -50,6 +50,10 @@ type Message struct {
// ReplyETA is the expected round trip time to receive a response.
ReplyETA time.Duration
// IsBlocking indicates whether or not the client is blocking on the
// sending of the query and the receiving of it's reply.
IsBlocking bool
// SURBID is the SURB identifier.
SURBID *[sConstants.SURBIDLength]byte
......
......@@ -80,9 +80,19 @@ func (s *Session) doSend(msg *Message) error {
msg.ReplyETA = eta
s.surbIDMap.Store(surbID, msg)
}
s.eventCh.In() <- &MessageSentEvent{
MessageID: msg.ID,
Err: nil, // XXX
if msg.IsBlocking {
val, ok := s.messageSentWaitChanMap.Load(*msg.ID)
if !ok {
err := fmt.Errorf("Impossible failure, sentWaitChan not found for message ID %x", *msg.ID)
s.log.Error(err.Error())
s.fatalErrCh <- err
}
sentWaitChan <- msg
} else {
s.eventCh.In() <- &MessageSentEvent{
MessageID: msg.ID,
Err: nil, // XXX send error
}
}
return nil
}
......@@ -136,7 +146,7 @@ func (s *Session) sendDropDecoy() error {
return s.doSend(msg)
}
func (s *Session) composeMessage(recipient, provider string, message []byte) (*Message, error) {
func (s *Session) composeMessage(recipient, provider string, message []byte, isBlocking bool) (*Message, error) {
s.log.Debug("SendMessage")
if len(message) > constants.UserForwardPayloadLength-4 {
return nil, fmt.Errorf("invalid message size: %v", len(message))
......@@ -150,19 +160,20 @@ func (s *Session) composeMessage(recipient, provider string, message []byte) (*M
return nil, err
}
var msg = Message{
ID: &id,
Recipient: recipient,
Provider: provider,
Payload: payload[:],
WithSURB: true,
ID: &id,
Recipient: recipient,
Provider: provider,
Payload: payload[:],
WithSURB: true,
SURBType: cConstants.SurbTypeKaetzchen,
IsBlocking: isBlocking,
}
msg.SURBType = cConstants.SurbTypeKaetzchen
return &msg, nil
}
// SendUnreliableMessage asynchronously sends message without any automatic retransmissions.
func (s *Session) SendUnreliableMessage(recipient, provider string, message []byte) (MessageID, error) {
msg, err := s.composeMessage(recipient, provider, message)
msg, err := s.composeMessage(recipient, provider, message, false)
if err != nil {
return nil, err
}
......@@ -174,12 +185,32 @@ func (s *Session) SendUnreliableMessage(recipient, provider string, message []by
return msg.ID, nil
}
// WaitForReply blocks until a reply is received or the
// round trip timeout is reached.
func (s *Session) WaitForReply(id MessageID) ([]byte, error) {
s.awaitReplyMap
// XXX fix me
return nil, nil
func (s *Session) BlockingSendUnreliableMessage(recipient, provider string, message []byte) ([]byte, error) {
msg, err := s.composeMessage(recipient, provider, message, true)
if err != nil {
return nil, err
}
s.messageIDMap.Store(*msg.ID, msg)
sentWaitChan := make(chan interface{})
s.messageSentWaitChanMap.Store(*msg.ID, sentWaitChan)
err = s.egressQueue.Push(msg)
if err != nil {
return nil, err
}
// wait until sent so that we know the ReplyETA for the waiting below
sentMessage := <-sentWaitChan
// wait for reply or round trip timeout
replyWaitChan, ok := s.replyWaitChanMap.Load()
if !ok {
err := fmt.Errorf("Impossible failure, replyWaitChan not found for message ID %x", *msg.ID)
s.log.Error(err.Error())
s.fatalErrCh <- err
}
select {
case reply := <-replyWaitChan:
return reply, nil
case <-time.After(sentMessage.ReplyETA + roundTripTimeSlop):
return nil, ReplyTimeoutError
}
// unreachable
}
......@@ -152,22 +152,6 @@ func (s *Session) eventSinkWorker() {
case <-s.HaltCh():
return
case e := <-s.eventCh.Out():
switch event := e.(type) {
case *ConnectionStatusEvent:
// XXX fix me
case *MessageReplyEvent:
_, ok := s.awaitReplyMap.Load(*e.MessageID)
if ok {
}
case *MessageSentEvent:
// XXX fix me
default:
err := errors.New("Aborting, received unknown event which should be impossible.")
s.log.Error(err.Error())
s.fatalErrCh <- err
return
}
s.EventSink <- e.(Event)
}
}
......@@ -270,10 +254,14 @@ func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte
}
switch msg.SURBType {
case cConstants.SurbTypeKaetzchen, cConstants.SurbTypeInternal:
s.eventCh.In() <- &MessageReplyEvent{
MessageID: msg.ID,
Payload: plaintext[2:],
Err: nil,
if msg.IsBlocking {
// XXX fix me
} else {
s.eventCh.In() <- &MessageReplyEvent{
MessageID: msg.ID,
Payload: plaintext[2:],
Err: nil,
}
}
default:
s.log.Warningf("Discarding SURB %v: Unknown type: 0x%02x", idStr, msg.SURBType)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment