Add WaitToConnect

parent b92c3e1b
......@@ -28,15 +28,9 @@ pkiKey = "900895721381C0756D28954524BB1D090F54C8DD9295F84B1D8A93F1E3C17AD8"
client = minclient.NewClient(pkiAddr, pkiKey, minclient.LogConfig())
key = minclient.StringToKey(keyStr)
session = client.NewSession(name, provider, key)
session.WaitToConnect()
sent = False
while not sent:
try:
session.SendMessage("bob", "panoramix.org", "hello bob!!!")
time.sleep(1)
sent = True
except RuntimeError:
pass
session.SendMessage("bob", "panoramix.org", "hello bob!!!")
print("Message sent")
while 1:
......
......@@ -34,9 +34,10 @@ import (
// Session holds the client session
type Session struct {
client *minclient.Client
queue chan string
log *logging.Logger
client *minclient.Client
queue chan string
connected chan bool
log *logging.Logger
// TODO: we'll need to add persistency to the surb keys at some point
surbKeys map[[constants.SURBIDLength]byte][]byte
......@@ -60,6 +61,7 @@ func (client Client) NewSession(user string, provider string, key Key) (Session,
}
session.queue = make(chan string, 100)
session.connected = make(chan bool)
session.surbKeys = make(map[[constants.SURBIDLength]byte][]byte)
session.client, err = minclient.New(clientCfg)
session.log = client.log.GetLogger(fmt.Sprintf("callbacks:%s@%s", user, provider))
......@@ -71,6 +73,11 @@ func (s Session) Shutdown() {
s.client.Shutdown()
}
// WaitToConnect returns when the session is stablished
func (s Session) WaitToConnect() {
<-s.connected
}
// SendMessage into the mix network
func (s Session) SendMessage(recipient, provider, msg string) error {
surbID := [constants.SURBIDLength]byte{}
......@@ -91,7 +98,7 @@ func (s Session) SendMessage(recipient, provider, msg string) error {
}
// GetMessage blocks until there is a message in the inbox
func (s *Session) GetMessage(timeout int) (string, error) {
func (s *Session) GetMessage(timeout int64) (string, error) {
if timeout == 0 {
return <-s.queue, nil
}
......@@ -99,7 +106,7 @@ func (s *Session) GetMessage(timeout int) (string, error) {
select {
case msg := <-s.queue:
return msg, nil
case <-time.After(time.Second * timeout):
case <-time.After(time.Second * time.Duration(timeout)):
return "", errors.New("Timeout")
}
}
......@@ -139,6 +146,7 @@ func (s *Session) onACK(id *[constants.SURBIDLength]byte, b []byte) error {
func (s *Session) onConn(isConnected bool) {
s.log.Noticef("Peer connection status changed: %v", isConnected)
s.connected <- true
}
func (s *Session) onEmpty() error {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment