Commit 28770ae1 authored by David Stainton's avatar David Stainton

Fix shutdown code path

parent 1cf17f86
......@@ -173,7 +173,7 @@ func (c *Client) Wait() {
func (c *Client) halt() {
c.log.Noticef("Starting graceful shutdown.")
if c.session != nil {
c.session.Halt()
c.session.Shutdown()
}
close(c.fatalErrCh)
close(c.haltedCh)
......
......@@ -29,6 +29,25 @@ import (
"github.com/stretchr/testify/require"
)
func TestDockerClientConnectShutdown(t *testing.T) {
require := require.New(t)
cfg, err := config.LoadFile("testdata/client.toml")
require.NoError(err)
cfg, linkKey := AutoRegisterRandomClient(cfg)
client, err := New(cfg)
require.NoError(err)
session, err := client.NewSession(linkKey)
require.NoError(err)
<-session.EventSink
client.Shutdown()
client.Wait()
}
func TestDockerClientBlockingSendReceive(t *testing.T) {
require := require.New(t)
......@@ -48,6 +67,9 @@ func TestDockerClientBlockingSendReceive(t *testing.T) {
reply, err := session.BlockingSendUnreliableMessage(desc.Name, desc.Provider, []byte("hello"))
require.NoError(err)
require.True(utils.CtIsZero(reply))
client.Shutdown()
client.Wait()
}
func TestDockerClientBlockingSendReceiveWithDecoyTraffic(t *testing.T) {
......@@ -70,6 +92,9 @@ func TestDockerClientBlockingSendReceiveWithDecoyTraffic(t *testing.T) {
reply, err := session.BlockingSendUnreliableMessage(desc.Name, desc.Provider, []byte("hello"))
require.NoError(err)
require.True(utils.CtIsZero(reply))
client.Shutdown()
client.Wait()
}
func TestDockerClientAsyncSendReceive(t *testing.T) {
......@@ -115,6 +140,9 @@ func TestDockerClientAsyncSendReceive(t *testing.T) {
}
}()
wg.Wait()
client.Shutdown()
client.Wait()
}
func TestDockerClientAsyncSendReceiveWithDecoyTraffic(t *testing.T) {
......@@ -161,4 +189,7 @@ func TestDockerClientAsyncSendReceiveWithDecoyTraffic(t *testing.T) {
}
}()
wg.Wait()
client.Shutdown()
client.Wait()
}
......@@ -108,7 +108,9 @@ func (t *Fount) Start() {
// Stop stops the timer.
func (t *Fount) Stop() {
t.Timer.Stop()
if !t.Timer.Stop() {
<-t.Timer.C
}
}
// NewTimer is used to create a new Fount. A subsequent
......
......@@ -148,9 +148,15 @@ func (s *Session) eventSinkWorker() {
for {
select {
case <-s.HaltCh():
s.log.Debugf("Event sink worker terminating gracefully.")
return
case e := <-s.eventCh.Out():
s.EventSink <- e.(Event)
select {
case s.EventSink <- e.(Event):
case <-s.HaltCh():
s.log.Debugf("Event sink worker terminating gracefully.")
return
}
}
}
}
......@@ -162,7 +168,7 @@ func (s *Session) awaitFirstPKIDoc(ctx context.Context) (*pki.Document, error) {
case <-ctx.Done():
return nil, ctx.Err()
case <-s.HaltCh():
s.log.Debugf("Terminating gracefully.")
s.log.Debugf("Await first pki doc worker terminating gracefully")
return nil, errors.New("Terminating gracefully.")
case <-time.After(time.Duration(s.cfg.Debug.InitialMaxPKIRetrievalDelay) * time.Second):
return nil, errors.New("Timeout failure awaiting first PKI document.")
......@@ -289,3 +295,10 @@ func (s *Session) onDocument(doc *pki.Document) {
func (s *Session) GetPandaConfig() *config.Panda {
return s.cfg.Panda
}
func (s *Session) Shutdown() {
s.minclient.Shutdown()
s.minclient.Wait()
s.Halt()
s.log.Info("Session shutdown completed.")
}
......@@ -108,9 +108,12 @@ func (s *Session) maybeUpdateTimers(doc *pki.Document) {
// not yet started.
func (s *Session) worker() {
s.pTimer.Start()
defer s.pTimer.Stop()
s.lTimer.Start()
defer s.lTimer.Stop()
defer func() {
s.pTimer.Stop()
s.lTimer.Stop()
s.log.Debug("Session timers stopped.")
}()
var isConnected bool
for {
......@@ -119,7 +122,7 @@ func (s *Session) worker() {
var qo workerOp
select {
case <-s.HaltCh():
s.log.Debugf("Terminating gracefully.")
s.log.Debugf("Session worker terminating gracefully.")
return
case <-s.pTimer.Timer.C:
lambdaPFired = true
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment