Commit e8beb9c0 authored by David Stainton's avatar David Stainton

Fix shutdown code path

parent f98cb2e4
Pipeline #1008 passed with stage
in 7 minutes and 7 seconds
......@@ -105,7 +105,7 @@ func NewSession(
fatalErrCh: fatalErrCh,
eventCh: channels.NewInfiniteChannel(),
EventSink: make(chan Event),
opCh: make(chan workerOp),
opCh: make(chan workerOp, 8),
egressQueue: new(Queue),
}
// Configure and bring up the minclient instance.
......@@ -246,13 +246,22 @@ func (s *Session) GetService(serviceName string) (*utils.ServiceDescriptor, erro
func (s *Session) onConnection(err error) {
s.log.Debugf("onConnection %v", err)
s.eventCh.In() <- &ConnectionStatusEvent{
select {
case <-s.HaltCh():
s.log.Debugf("onConnection callback terminating gracefully")
return
case s.eventCh.In() <- &ConnectionStatusEvent{
IsConnected: err == nil,
Err: err,
}:
}
s.opCh <- opConnStatusChanged{
select {
case <-s.HaltCh():
s.log.Debugf("onConnection callback terminating gracefully")
return
case s.opCh <- opConnStatusChanged{
isConnected: err == nil,
}:
}
}
......@@ -329,8 +338,7 @@ func (s *Session) GetPandaConfig() *config.Panda {
}
func (s *Session) Shutdown() {
s.Halt()
s.minclient.Shutdown()
s.minclient.Wait()
s.Halt()
s.log.Info("Session shutdown completed.")
}
// worker.go - mixnet client worker
// Copyright (C) 2018, 2019 David Stainton.
// Copyright (C) 2018, 2019 David Stainton
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
......@@ -28,8 +28,6 @@ import (
type workerOp interface{}
type opIsEmpty struct{}
type opConnStatusChanged struct {
isConnected bool
}
......@@ -77,7 +75,6 @@ func (s *Session) worker() {
return
}
// λP
lambdaP := doc.LambdaP
lambdaPMsec := uint64(rand.Exp(mRng, lambdaP))
if lambdaPMsec > doc.LambdaPMaxDelay {
......@@ -87,7 +84,6 @@ func (s *Session) worker() {
lambdaPTimer := time.NewTimer(lambdaPInterval)
defer lambdaPTimer.Stop()
// λL
lambdaL := doc.LambdaL
lambdaLMsec := uint64(rand.Exp(mRng, lambdaL))
if lambdaLMsec > doc.LambdaLMaxDelay {
......@@ -97,7 +93,10 @@ func (s *Session) worker() {
lambdaLTimer := time.NewTimer(lambdaLInterval)
defer lambdaLTimer.Stop()
defer s.log.Debug("session worker halted")
isConnected := true
mustResetBothTimers := false
for {
var lambdaPFired bool
var lambdaLFired bool
......@@ -113,23 +112,14 @@ func (s *Session) worker() {
case qo = <-s.opCh:
}
if lambdaPFired {
if isConnected {
s.sendFromQueueOrDecoy()
}
}
if lambdaLFired {
if isConnected && !s.cfg.Debug.DisableDecoyTraffic {
s.sendLoopDecoy()
}
}
if qo != nil {
switch op := qo.(type) {
case opIsEmpty:
// XXX do periodic cleanup here
continue
case opConnStatusChanged:
isConnected = s.connStatusChange(op)
newConnectedStatus := s.connStatusChange(op)
if newConnectedStatus != isConnected {
mustResetBothTimers = true
}
isConnected = newConnectedStatus
case opNewDocument:
s.setPollingInterval(op.doc)
err := s.isDocValid(doc)
......@@ -138,9 +128,18 @@ func (s *Session) worker() {
}
lambdaP = doc.LambdaP
lambdaL = doc.LambdaL
mustResetBothTimers = true
default:
s.log.Warningf("BUG: Worker received nonsensical op: %T", op)
} // end of switch
} else {
if isConnected {
if lambdaPFired {
s.sendFromQueueOrDecoy()
} else if lambdaLFired && !s.cfg.Debug.DisableDecoyTraffic {
s.sendLoopDecoy()
}
}
}
if isConnected {
if lambdaPFired {
......@@ -149,7 +148,6 @@ func (s *Session) worker() {
lambdaPMsec = doc.LambdaPMaxDelay
}
lambdaPInterval = time.Duration(lambdaPMsec) * time.Millisecond
lambdaPTimer.Reset(lambdaPInterval)
}
if lambdaLFired {
lambdaLMsec := uint64(rand.Exp(mRng, lambdaL))
......@@ -157,17 +155,25 @@ func (s *Session) worker() {
lambdaLMsec = doc.LambdaLMaxDelay
}
lambdaLInterval = time.Duration(lambdaLMsec) * time.Millisecond
lambdaLTimer.Reset(lambdaLInterval)
}
} else {
if !lambdaPFired && !lambdaPTimer.Stop() {
<-lambdaPTimer.C
lambdaLInterval = time.Duration(maxDuration)
lambdaPInterval = time.Duration(maxDuration)
}
if mustResetBothTimers {
lambdaPTimer.Reset(lambdaPInterval)
lambdaLTimer.Reset(lambdaLInterval)
mustResetBothTimers = false
} else {
// reset only the timer that fired
if lambdaPFired {
lambdaPTimer.Reset(lambdaPInterval)
continue
}
if !lambdaLFired && !lambdaLTimer.Stop() {
<-lambdaLTimer.C
if lambdaPFired {
lambdaLTimer.Reset(lambdaLInterval)
}
lambdaPTimer.Reset(maxDuration)
lambdaLTimer.Reset(maxDuration)
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment