Commit f98cb2e4 authored by David Stainton's avatar David Stainton

Remove dumb poisson abstraction

parent 6620f9fb
Pipeline #1006 passed with stage
in 7 minutes and 3 seconds
// timer.go - Poisson timer.
// Copyright (C) 2018 David Stainton.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package poisson
import (
"math"
mrand "math/rand"
"time"
"github.com/katzenpost/core/crypto/rand"
)
// Descriptor describes a Poisson process.
type Descriptor struct {
Lambda float64
Max uint64
}
// Equals return true if the given Descriptor s is
// equal to d.
func (d *Descriptor) Equals(s *Descriptor) bool {
if d.Lambda != s.Lambda {
return false
}
if d.Max != s.Max {
return false
}
return true
}
// Fount is used to produce channel events after delays
// selected from a Poisson process.
type Fount struct {
Timer *time.Timer
rng *mrand.Rand
desc *Descriptor
}
// DescriptorEquals returns true if the Fount's Poisson descriptor
// is equal to the given Poisson descriptor s.
func (t *Fount) DescriptorEquals(s *Descriptor) bool {
return t.desc.Equals(s)
}
// SetPoisson sets a new Poisson descriptor if the
// descriptor is different than the one we have set already.
func (t *Fount) SetPoisson(desc *Descriptor) {
if !t.DescriptorEquals(desc) {
if !t.Timer.Stop() {
<-t.Timer.C
}
t.desc = desc
t.Next()
}
}
func (t *Fount) nextInterval() time.Duration {
wakeMsec := uint64(rand.Exp(t.rng, t.desc.Lambda))
switch {
case wakeMsec > t.desc.Max:
wakeMsec = t.desc.Max
default:
}
wakeInterval := time.Duration(wakeMsec) * time.Millisecond
return wakeInterval
}
func (t *Fount) Channel() <-chan time.Time {
return t.Timer.C
}
// Next resets the timer to the next Poisson process value.
// This MUST NOT be called unless the timer has fired.
func (t *Fount) Next() {
wakeInterval := t.nextInterval()
t.Timer.Reset(wakeInterval)
}
// NextMax resets the timer to the maximum
// possible value.
func (t *Fount) NextMax() {
if !t.Timer.Stop() {
<-t.Timer.C
}
t.Timer.Reset(math.MaxInt64)
}
// Start is used to initialize and start the timer
// after timer creation.
func (t *Fount) Start() {
wakeInterval := t.nextInterval()
t.Timer = time.NewTimer(wakeInterval)
}
// Stop stops the timer.
func (t *Fount) Stop() {
if !t.Timer.Stop() {
<-t.Timer.C
}
}
// NewTimer is used to create a new Fount. A subsequent
// call to the Start method is used to activate the timer.
func NewTimer(desc *Descriptor) *Fount {
t := &Fount{
rng: rand.NewMath(),
desc: desc,
}
return t
}
......@@ -29,7 +29,6 @@ import (
"github.com/katzenpost/client/config"
cConstants "github.com/katzenpost/client/constants"
"github.com/katzenpost/client/internal/pkiclient"
"github.com/katzenpost/client/poisson"
"github.com/katzenpost/client/utils"
coreConstants "github.com/katzenpost/core/constants"
"github.com/katzenpost/core/crypto/ecdh"
......@@ -58,11 +57,6 @@ type Session struct {
eventCh channels.Channel
EventSink chan Event
// λP
pTimer *poisson.Fount
// λL
lTimer *poisson.Fount
linkKey *ecdh.PrivateKey
onlineAt time.Time
hasPKIDoc bool
......@@ -141,11 +135,10 @@ func NewSession(
// block until we get the first PKI document
// and then set our timers accordingly
doc, err := s.awaitFirstPKIDoc(ctx)
err = s.awaitFirstPKIDoc(ctx)
if err != nil {
return nil, err
}
s.setTimers(doc)
s.Go(s.worker)
return s, nil
}
......@@ -203,17 +196,17 @@ func (s *Session) garbageCollect() {
s.surbIDMap.Range(surbIDMapRange)
}
func (s *Session) awaitFirstPKIDoc(ctx context.Context) (*pki.Document, error) {
func (s *Session) awaitFirstPKIDoc(ctx context.Context) error {
for {
var qo workerOp
select {
case <-ctx.Done():
return nil, ctx.Err()
return ctx.Err()
case <-s.HaltCh():
s.log.Debugf("Await first pki doc worker terminating gracefully")
return nil, errors.New("terminating gracefully")
return errors.New("terminating gracefully")
case <-time.After(time.Duration(s.cfg.Debug.InitialMaxPKIRetrievalDelay) * time.Second):
return nil, errors.New("timeout failure awaiting first PKI document")
return errors.New("timeout failure awaiting first PKI document")
case qo = <-s.opCh:
}
switch op := qo.(type) {
......@@ -221,16 +214,17 @@ func (s *Session) awaitFirstPKIDoc(ctx context.Context) (*pki.Document, error) {
// Determine if PKI doc is valid. If not then abort.
err := s.isDocValid(op.doc)
if err != nil {
err := fmt.Errorf("aborting, PKI doc is not valid for the Loopix decoy traffic use case: %v", err)
err := fmt.Errorf("aborting, PKI doc is not valid for our decoy traffic use case: %v", err)
s.log.Error(err.Error())
s.fatalErrCh <- err
return nil, err
return err
}
return op.doc, nil
return nil
default:
continue
}
}
// NOT REACHED
}
// GetService returns a randomly selected service
......
......@@ -18,11 +18,11 @@ package client
import (
"errors"
"fmt"
"math"
"time"
"github.com/katzenpost/client/constants"
"github.com/katzenpost/client/poisson"
"github.com/katzenpost/core/crypto/rand"
"github.com/katzenpost/core/pki"
)
......@@ -45,30 +45,6 @@ func (s *Session) setPollingInterval(doc *pki.Document) {
s.minclient.SetPollInterval(interval)
}
func (s *Session) setTimers(doc *pki.Document) {
// λP
pDesc := &poisson.Descriptor{
Lambda: doc.LambdaP,
Max: doc.LambdaPMaxDelay,
}
if s.pTimer == nil {
s.pTimer = poisson.NewTimer(pDesc)
} else {
s.pTimer.SetPoisson(pDesc)
}
// λL
lDesc := &poisson.Descriptor{
Lambda: doc.LambdaL,
Max: doc.LambdaLMaxDelay,
}
if s.lTimer == nil {
s.lTimer = poisson.NewTimer(lDesc)
} else {
s.lTimer.SetPoisson(lDesc)
}
}
func (s *Session) connStatusChange(op opConnStatusChanged) bool {
isConnected := op.isConnected
if isConnected {
......@@ -90,32 +66,38 @@ func (s *Session) connStatusChange(op opConnStatusChanged) bool {
return isConnected
}
func (s *Session) maybeUpdateTimers(doc *pki.Document) {
// Determine if PKI doc is valid. If not then abort.
err := s.isDocValid(doc)
if err != nil {
err := fmt.Errorf("aborting, PKI doc is not valid for the Loopix decoy traffic use case: %v", err)
s.log.Error(err.Error())
s.fatalErrCh <- err
func (s *Session) worker() {
const maxDuration = math.MaxInt64
mRng := rand.NewMath()
// The PKI doc should be cached since we've
// already waited until we received it.
doc := s.minclient.CurrentDocument()
if doc == nil {
s.fatalErrCh <- errors.New("aborting, PKI doc is nil")
return
}
s.setTimers(doc)
}
// worker performs work. It runs in it's own goroutine
// and implements a shutdown code path as well.
// This function assumes the timers are setup but
// not yet started.
func (s *Session) worker() {
s.pTimer.Start()
s.lTimer.Start()
defer func() {
s.pTimer.Stop()
s.lTimer.Stop()
s.log.Debug("Session timers stopped.")
}()
var isConnected bool
// λP
lambdaP := doc.LambdaP
lambdaPMsec := uint64(rand.Exp(mRng, lambdaP))
if lambdaPMsec > doc.LambdaPMaxDelay {
lambdaPMsec = doc.LambdaPMaxDelay
}
lambdaPInterval := time.Duration(lambdaPMsec) * time.Millisecond
lambdaPTimer := time.NewTimer(lambdaPInterval)
defer lambdaPTimer.Stop()
// λL
lambdaL := doc.LambdaL
lambdaLMsec := uint64(rand.Exp(mRng, lambdaL))
if lambdaLMsec > doc.LambdaLMaxDelay {
lambdaLMsec = doc.LambdaLMaxDelay
}
lambdaLInterval := time.Duration(lambdaLMsec) * time.Millisecond
lambdaLTimer := time.NewTimer(lambdaLInterval)
defer lambdaLTimer.Stop()
isConnected := true
for {
var lambdaPFired bool
var lambdaLFired bool
......@@ -124,9 +106,9 @@ func (s *Session) worker() {
case <-s.HaltCh():
s.log.Debugf("Session worker terminating gracefully.")
return
case <-s.pTimer.Timer.C:
case <-lambdaPTimer.C:
lambdaPFired = true
case <-s.lTimer.Timer.C:
case <-lambdaLTimer.C:
lambdaLFired = true
case qo = <-s.opCh:
}
......@@ -150,21 +132,42 @@ func (s *Session) worker() {
isConnected = s.connStatusChange(op)
case opNewDocument:
s.setPollingInterval(op.doc)
s.maybeUpdateTimers(op.doc)
err := s.isDocValid(doc)
if err != nil {
s.fatalErrCh <- err
}
lambdaP = doc.LambdaP
lambdaL = doc.LambdaL
default:
s.log.Warningf("BUG: Worker received nonsensical op: %T", op)
} // end of switch
}
if isConnected {
if lambdaPFired {
s.pTimer.Next()
lambdaPMsec := uint64(rand.Exp(mRng, lambdaP))
if lambdaPMsec > doc.LambdaPMaxDelay {
lambdaPMsec = doc.LambdaPMaxDelay
}
lambdaPInterval = time.Duration(lambdaPMsec) * time.Millisecond
lambdaPTimer.Reset(lambdaPInterval)
}
if lambdaLFired {
s.lTimer.Next()
lambdaLMsec := uint64(rand.Exp(mRng, lambdaL))
if lambdaLMsec > doc.LambdaLMaxDelay {
lambdaLMsec = doc.LambdaLMaxDelay
}
lambdaLInterval = time.Duration(lambdaLMsec) * time.Millisecond
lambdaLTimer.Reset(lambdaLInterval)
}
} else {
s.pTimer.NextMax()
s.lTimer.NextMax()
if !lambdaPFired && !lambdaPTimer.Stop() {
<-lambdaPTimer.C
}
if !lambdaLFired && !lambdaLTimer.Stop() {
<-lambdaLTimer.C
}
lambdaPTimer.Reset(maxDuration)
lambdaLTimer.Reset(maxDuration)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment