Commit d21e876a authored by David Stainton's avatar David Stainton

Garbage collect SURB ID Map

parent 11b022d4
Pipeline #989 passed with stage
in 7 minutes and 5 seconds
......@@ -44,9 +44,6 @@ type Message struct {
// SentAt contains the time the message was sent.
SentAt time.Time
// Sent is set to true if the message was sent on the network.
Sent bool
// ReplyETA is the expected round trip time to receive a response.
ReplyETA time.Duration
......@@ -63,9 +60,6 @@ type Message struct {
// Reply is the SURB reply
Reply []byte
// SURBType is the SURB type.
SURBType int
// WithSURB specified if a SURB should be bundled with the forward payload.
WithSURB bool
......
......@@ -80,7 +80,6 @@ func (s *Session) doSend(msg *Message) {
s.log.Debugf("doSend setting ReplyETA to %v", eta)
msg.Key = key
msg.SentAt = time.Now()
msg.Sent = true
msg.ReplyETA = eta
s.surbIDMap.Store(surbID, msg)
}
......@@ -152,7 +151,6 @@ func (s *Session) composeMessage(recipient, provider string, message []byte, isB
Provider: provider,
Payload: payload[:],
WithSURB: true,
SURBType: cConstants.SurbTypeKaetzchen,
IsBlocking: isBlocking,
}
return &msg, nil
......
......@@ -27,7 +27,6 @@ import (
"time"
"github.com/katzenpost/client/config"
cConstants "github.com/katzenpost/client/constants"
"github.com/katzenpost/client/internal/pkiclient"
"github.com/katzenpost/client/poisson"
"github.com/katzenpost/client/utils"
......@@ -127,6 +126,7 @@ func New(ctx context.Context, fatalErrCh chan error, logBackend *log.Backend, cf
}
s.Go(s.eventSinkWorker)
s.Go(s.garbageCollectionWorker)
s.minclient, err = minclient.New(clientCfg)
if err != nil {
......@@ -161,6 +161,42 @@ func (s *Session) eventSinkWorker() {
}
}
func (s *Session) garbageCollectionWorker() {
const garbageCollectionInterval = 10 * time.Minute
timer := time.NewTimer(garbageCollectionInterval)
defer timer.Stop()
for {
select {
case <-s.HaltCh():
s.log.Debugf("Garbage collection worker terminating gracefully.")
return
case <-timer.C:
s.garbageCollect()
timer.Reset(garbageCollectionInterval)
}
}
}
func (s *Session) garbageCollect() {
// [sConstants.SURBIDLength]byte -> *Message
surbIDMapRange := func(rawSurbID, rawMessage interface{}) bool {
surbID := rawSurbID.([sConstants.SURBIDLength]byte)
message := rawMessage.(*Message)
if message.IsBlocking {
// Blocking sends don't need this garbage collection mechanism
// because the BlockingSendUnreliableMessage method will clean up
// after itself.
return true
}
if time.Now().After(message.SentAt.Add(message.ReplyETA).Add(roundTripTimeSlop)) {
s.log.Debug("Garbage collecting SURB ID Map entry for Message ID %x", message.ID)
s.surbIDMap.Delete(surbID)
}
return true
}
s.surbIDMap.Range(surbIDMapRange)
}
func (s *Session) awaitFirstPKIDoc(ctx context.Context) (*pki.Document, error) {
for {
var qo workerOp
......@@ -242,14 +278,14 @@ func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte
rawMessage, ok := s.surbIDMap.Load(*surbID)
if !ok {
s.log.Debug("Strange, received reply with unexpected SURBID")
s.log.Debug("BUG, received reply with unexpected SURBID")
return nil
}
s.surbIDMap.Delete(*surbID)
msg := rawMessage.(*Message)
plaintext, err := sphinx.DecryptSURBPayload(ciphertext, msg.Key)
if err != nil {
s.log.Infof("Impossible SURB Reply decryption failure: %s", err)
s.log.Infof("BUG, SURB Reply decryption failure: %s", err)
return err
}
if len(plaintext) != coreConstants.ForwardPayloadLength {
......@@ -260,27 +296,24 @@ func (s *Session) onACK(surbID *[sConstants.SURBIDLength]byte, ciphertext []byte
s.decrementDecoyLoopTally()
return nil
}
switch msg.SURBType {
case cConstants.SurbTypeKaetzchen, cConstants.SurbTypeInternal:
if msg.IsBlocking {
replyWaitChanRaw, ok := s.replyWaitChanMap.Load(*msg.ID)
if !ok {
err := fmt.Errorf("BUG, failure to acquire replyWaitChan for message ID %x", msg.ID)
s.fatalErrCh <- err
return err
}
replyWaitChan := replyWaitChanRaw.(chan []byte)
replyWaitChan <- plaintext[2:]
} else {
s.eventCh.In() <- &MessageReplyEvent{
MessageID: msg.ID,
Payload: plaintext[2:],
Err: nil,
}
if msg.IsBlocking {
replyWaitChanRaw, ok := s.replyWaitChanMap.Load(*msg.ID)
if !ok {
err := fmt.Errorf("BUG, failure to acquire replyWaitChan for message ID %x", msg.ID)
s.fatalErrCh <- err
return err
}
replyWaitChan := replyWaitChanRaw.(chan []byte)
replyWaitChan <- plaintext[2:]
} else {
s.eventCh.In() <- &MessageReplyEvent{
MessageID: msg.ID,
Payload: plaintext[2:],
Err: nil,
}
default:
s.log.Warningf("Discarding SURB %v: Unknown type: 0x%02x", idStr, msg.SURBType)
}
return nil
}
......
......@@ -144,7 +144,7 @@ func (s *Session) worker() {
if qo != nil {
switch op := qo.(type) {
case opIsEmpty:
// XXX do periodic cleanup here
// I don't think we need to do anything here.
continue
case opConnStatusChanged:
isConnected = s.connStatusChange(op)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment