Commit 0af6311c authored by David Stainton's avatar David Stainton

WIP

parent fb47abdd
......@@ -12,5 +12,6 @@ require (
github.com/stretchr/testify v1.4.0
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80
golang.org/x/text v0.3.2
gopkg.in/eapache/channels.v1 v1.1.0
gopkg.in/op/go-logging.v1 v1.0.0-20160211212156-b2cb9fa56473
)
......@@ -34,93 +34,6 @@ const roundTripTimeSlop = time.Duration(88 * time.Second)
var ReplyTimeoutError = errors.New("Failure waiting for reply, timeout reached")
// WaitForSent blocks until the message with the corresponding message ID is sent.
func (s *Session) waitForSent(id MessageID) error {
s.log.Debug("Waiting for message to be sent.")
var waitCh chan Event
var err error
s.mapLock.Lock()
msg, ok := s.messageIDMap[*id]
if !ok {
err = fmt.Errorf("[%v] Failure waiting for reply, invalid message ID", id)
} else {
if msg.Sent {
s.mapLock.Unlock()
return nil
}
}
waitCh, ok = s.waitSentChans[*id]
s.mapLock.Unlock()
if ok {
defer func() {
s.mapLock.Lock()
delete(s.waitSentChans, *id)
s.mapLock.Unlock()
}()
} else {
err = fmt.Errorf("[%v] Failure waiting for reply, invalid message ID", id)
}
if err != nil {
return err
}
select {
case <-waitCh:
case <-time.After(1 * time.Minute):
return fmt.Errorf("[%v] Failure waiting for reply, timeout", id)
}
s.log.Debug("Finished waiting. Message was sent.")
return nil
}
// WaitForReply blocks until a reply is received.
func (s *Session) WaitForReply(id MessageID) ([]byte, error) {
s.log.Debugf("WaitForReply message ID: %x\n", *id)
err := s.waitForSent(id)
if err != nil {
return nil, err
}
s.mapLock.Lock()
waitCh, ok := s.waitChans[*id]
if ok {
defer func() {
s.mapLock.Lock()
delete(s.waitChans, *id)
s.mapLock.Unlock()
}()
} else {
err = fmt.Errorf("[%v] Failure waiting for reply, invalid message ID", id)
}
msg, ok := s.messageIDMap[*id]
if ok {
// XXX Consider what will happen because of this deletion
// when we implement an ARQ based reliability.
defer func() {
s.mapLock.Lock()
delete(s.messageIDMap, *id)
s.mapLock.Unlock()
}()
} else {
err = fmt.Errorf("[%v] Failure waiting for reply, invalid message ID", id)
}
s.log.Debug("reply eta is %v", msg.ReplyETA)
s.mapLock.Unlock()
if err != nil {
return nil, err
}
select {
case event := <-waitCh:
e, ok := event.(*MessageReplyEvent)
if !ok {
s.log.Debug("UNKNOWN EVENT TYPE FOUND IN WAIT CHANNEL FOR THE GIVEN MESSAGE ID.")
}
return e.Payload, nil
case <-time.After(msg.ReplyETA + roundTripTimeSlop):
return nil, ReplyTimeoutError
}
// unreachable
}
func (s *Session) sendNext() error {
msg, err := s.egressQueue.Peek()
if err != nil {
......@@ -159,29 +72,17 @@ func (s *Session) doSend(msg *Message) error {
if err != nil {
return err
}
s.mapLock.Lock()
if msg.WithSURB {
s.log.Debugf("doSend setting ReplyETA to %v", eta)
msg.Key = key
msg.SentAt = time.Now()
msg.Sent = true
msg.ReplyETA = eta
s.surbIDMap[surbID] = msg
s.surbIDMap.Store(surbID, msg)
}
eventCh, ok := s.waitSentChans[*msg.ID]
s.mapLock.Unlock()
if ok {
select {
case eventCh <- &MessageSentEvent{
MessageID: msg.ID,
Err: nil,
}:
case <-time.After(3 * time.Second):
s.log.Debug("timeout reached when attempting to sent to waitSentChans")
break
}
} else {
s.log.Debug("no waitSentChans map entry found for that message ID")
s.eventCh.In() <- &MessageSentEvent{
MessageID: msg.ID,
Err: nil, // XXX
}
return nil
}
......@@ -265,16 +166,17 @@ func (s *Session) SendUnreliableMessage(recipient, provider string, message []by
if err != nil {
return nil, err
}
s.mapLock.Lock()
s.messageIDMap[*msg.ID] = msg
s.waitChans[*msg.ID] = make(chan Event)
s.waitSentChans[*msg.ID] = make(chan Event)
s.mapLock.Unlock()
s.messageIDMap.Store(*msg.ID, msg)
err = s.egressQueue.Push(msg)
if err != nil {
return nil, err
}
return msg.ID, nil
}
// WaitForReply blocks until a reply is received or the
// round trip timeout is reached.
func (s *Session) WaitForReply(id MessageID) ([]byte, error) {
// XXX fix me
return nil, nil
}
......@@ -39,6 +39,7 @@ import (
sConstants "github.com/katzenpost/core/sphinx/constants"
"github.com/katzenpost/core/worker"
"github.com/katzenpost/minclient"
"gopkg.in/eapache/channels.v1"
"gopkg.in/op/go-logging.v1"
)
......@@ -52,6 +53,10 @@ type Session struct {
log *logging.Logger
fatalErrCh chan error
opCh chan workerOp
eventCh channels.Channel
EventSink chan Event
// λP
pTimer *poisson.Fount
......@@ -61,17 +66,14 @@ type Session struct {
lTimer *poisson.Fount
linkKey *ecdh.PrivateKey
opCh chan workerOp
onlineAt time.Time
hasPKIDoc bool
egressQueue EgressQueue
waitSentChans map[[cConstants.MessageIDLength]byte]chan Event
waitChans map[[cConstants.MessageIDLength]byte]chan Event
surbIDMap map[[sConstants.SURBIDLength]byte]*Message
messageIDMap map[[cConstants.MessageIDLength]byte]*Message
mapLock *sync.Mutex
surbIDMap sync.Map // [sConstants.SURBIDLength]byte -> *Message
messageIDMap sync.Map // [cConstants.MessageIDLength]byte -> *Message
awaitReplyMap sync.Map // MessageID -> bool
decoyLoopTally uint64
}
......@@ -99,20 +101,16 @@ func New(ctx context.Context, fatalErrCh chan error, logBackend *log.Backend, cf
log := logBackend.GetLogger(fmt.Sprintf("%s@%s_client", cfg.Account.User, cfg.Account.Provider))
s := &Session{
cfg: cfg,
linkKey: linkKey,
pkiClient: pkiClient,
log: log,
fatalErrCh: fatalErrCh,
opCh: make(chan workerOp),
waitChans: make(map[[sConstants.SURBIDLength]byte]chan Event),
waitSentChans: make(map[[cConstants.MessageIDLength]byte]chan Event),
cfg: cfg,
linkKey: linkKey,
pkiClient: pkiClient,
log: log,
fatalErrCh: fatalErrCh,
eventCh: channels.NewInfiniteChannel(),
EventSink: make(chan Event),
opCh: make(chan workerOp),
egressQueue: new(Queue),
}
s.surbIDMap = make(map[[sConstants.SURBIDLength]byte]*Message)
s.messageIDMap = make(map[[cConstants.MessageIDLength]byte]*Message)
s.mapLock = new(sync.Mutex)
s.egressQueue = new(Queue)
// Configure and bring up the minclient instance.
clientCfg := &minclient.ClientConfig{
User: cfg.Account.User,
......@@ -130,6 +128,8 @@ func New(ctx context.Context, fatalErrCh chan error, logBackend *log.Backend, cf
EnableTimeSync: false, // Be explicit about it.
}
s.Go(s.eventSinkWorker)
s.minclient, err = minclient.New(clientCfg)
if err != nil {
return nil, err
......@@ -146,6 +146,30 @@ func New(ctx context.Context, fatalErrCh chan error, logBackend *log.Backend, cf
return s, nil
}
func (s *Session) eventSinkWorker() {
for {
select {
case <-s.HaltCh():
return
case e := <-s.eventCh.Out():
switch event := e.(type) {
case *ConnectionStatusEvent:
// XXX fix me
case *MessageReplyEvent:
s.awaitReplyMap.Load(*e.MessageID)
case *MessageSentEvent:
// XXX fix me
default:
err := errors.New("Aborting, received unknown event which should be impossible.")
s.log.Error(err.Error())
s.fatalErrCh <- err
return
}
s.EventSink <- e.(Event)
}
}
}
func (s *Session) awaitFirstPKIDoc(ctx context.Context) (*pki.Document, error) {
for {
var qo workerOp
......@@ -220,13 +244,13 @@ func (s *Session) decrementDecoyLoopTally() {
func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte) error {
idStr := fmt.Sprintf("[%v]", hex.EncodeToString(surbID[:]))
s.log.Infof("OnACK with SURBID %x", idStr)
s.mapLock.Lock()
defer s.mapLock.Unlock()
msg, ok := s.surbIDMap[*surbID]
rawMessage, ok := s.surbIDMap.Load(surbID)
if !ok {
s.log.Debug("Strange, received reply with unexpected SURBID")
return nil
}
msg := rawMessage.(*Message)
plaintext, err := sphinx.DecryptSURBPayload(ciphertext, msg.Key)
if err != nil {
s.log.Infof("SURB Reply decryption failure: %s", err)
......@@ -237,30 +261,16 @@ func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte
return nil
}
if msg.WithSURB && msg.IsDecoy {
_, ok := s.surbIDMap[*surbID]
if ok {
s.decrementDecoyLoopTally()
delete(s.surbIDMap, *surbID)
return nil
}
s.log.Warning("Reply is from an unknown Decoy Loop Message.")
s.decrementDecoyLoopTally()
s.surbIDMap.Delete(*surbID)
return nil
}
switch msg.SURBType {
case cConstants.SurbTypeKaetzchen, cConstants.SurbTypeInternal:
waitCh, ok := s.waitChans[*msg.ID]
if ok {
select {
case waitCh <- &MessageReplyEvent{
MessageID: msg.ID,
Payload: plaintext[2:],
Err: nil,
}:
case <-time.After(3 * time.Second):
s.log.Warning("Message Reply Event send timeout failure.")
}
} else {
s.log.Warning("Error, failure to find wait chan in waitChans map for that message ID.")
s.eventCh.In() <- &MessageReplyEvent{
MessageID: msg.ID,
Payload: plaintext[2:],
Err: nil,
}
default:
s.log.Warningf("Discarding SURB %v: Unknown type: 0x%02x", idStr, msg.SURBType)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment