send.go 5.75 KB
Newer Older
1
// send.go - mixnet client send
2
// Copyright (C) 2018  David Stainton.
3 4 5 6 7 8 9 10 11 12 13 14 15 16
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program.  If not, see <http://www.gnu.org/licenses/>.

17
package session
18

19
import (
20
	"encoding/binary"
21
	"encoding/hex"
22
	"errors"
David Stainton's avatar
David Stainton committed
23 24
	"fmt"
	"io"
25
	"time"
26

27
	cConstants "github.com/katzenpost/client/constants"
David Stainton's avatar
David Stainton committed
28 29 30
	"github.com/katzenpost/core/constants"
	"github.com/katzenpost/core/crypto/rand"
	sConstants "github.com/katzenpost/core/sphinx/constants"
31 32
)

David Stainton's avatar
golint  
David Stainton committed
33
const roundTripTimeSlop = time.Duration(88 * time.Second)
David Stainton's avatar
David Stainton committed
34

35 36
var ReplyTimeoutError = errors.New("Failure waiting for reply, timeout reached")

37
func (s *Session) sendNext() error {
38
	msg, err := s.egressQueue.Peek()
39 40 41
	if err != nil {
		return err
	}
42 43
	if msg == nil {
		return errors.New("send next failure, message is nil")
44
	}
45 46 47 48 49
	m, ok := msg.(*Message)
	if !ok {
		return errors.New("send next failure, unknown message type")
	}
	err = s.doSend(m)
50 51 52
	if err != nil {
		return err
	}
53
	_, err = s.egressQueue.Pop()
54
	return err
David Stainton's avatar
David Stainton committed
55
}
David Stainton's avatar
David Stainton committed
56

57
func (s *Session) doSend(msg *Message) error {
58
	surbID := [sConstants.SURBIDLength]byte{}
David Stainton's avatar
David Stainton committed
59 60 61 62
	_, err := io.ReadFull(rand.Reader, surbID[:])
	if err != nil {
		return err
	}
63 64
	idStr := fmt.Sprintf("[%v]", hex.EncodeToString(surbID[:]))
	s.log.Debugf("doSend with SURB ID %x", idStr)
65 66 67 68 69 70 71
	key := []byte{}
	var eta time.Duration
	if msg.WithSURB {
		key, eta, err = s.minclient.SendCiphertext(msg.Recipient, msg.Provider, &surbID, msg.Payload)
	} else {
		err = s.minclient.SendUnreliableCiphertext(msg.Recipient, msg.Provider, msg.Payload)
	}
72 73
	if err != nil {
		return err
David Stainton's avatar
David Stainton committed
74
	}
75
	if msg.WithSURB {
76
		s.log.Debugf("doSend setting ReplyETA to %v", eta)
77 78
		msg.Key = key
		msg.SentAt = time.Now()
79
		msg.Sent = true
80
		msg.ReplyETA = eta
David Stainton's avatar
WIP  
David Stainton committed
81
		s.surbIDMap.Store(surbID, msg)
82
	}
83
	if msg.IsBlocking {
84
		sentWaitChanRaw, ok := s.sentWaitChanMap.Load(*msg.ID)
85 86 87 88 89
		if !ok {
			err := fmt.Errorf("Impossible failure, sentWaitChan not found for message ID %x", *msg.ID)
			s.log.Error(err.Error())
			s.fatalErrCh <- err
		}
90
		sentWaitChan := sentWaitChanRaw.(chan *Message)
91 92 93 94 95 96
		sentWaitChan <- msg
	} else {
		s.eventCh.In() <- &MessageSentEvent{
			MessageID: msg.ID,
			Err:       nil, // XXX send error
		}
97
	}
98
	return nil
99 100
}

101 102
func (s *Session) sendLoopDecoy() error {
	s.log.Info("sending loop decoy")
David Stainton's avatar
David Stainton committed
103
	const loopService = "loop"
104
	serviceDesc, err := s.GetService(loopService)
David Stainton's avatar
David Stainton committed
105 106 107 108
	if err != nil {
		return err
	}
	payload := [constants.UserForwardPayloadLength]byte{}
109
	id := [cConstants.MessageIDLength]byte{}
David Stainton's avatar
David Stainton committed
110 111 112 113
	_, err = io.ReadFull(rand.Reader, id[:])
	if err != nil {
		return err
	}
114
	msg := &Message{
115
		ID:        &id,
David Stainton's avatar
David Stainton committed
116 117
		Recipient: serviceDesc.Name,
		Provider:  serviceDesc.Provider,
118
		Payload:   payload[:],
119 120
		WithSURB:  true,
		IsDecoy:   true,
David Stainton's avatar
David Stainton committed
121
	}
122
	defer s.incrementDecoyLoopTally()
123
	return s.doSend(msg)
124
}
125

126 127 128 129 130 131 132 133 134
func (s *Session) sendDropDecoy() error {
	s.log.Info("sending drop decoy")
	const loopService = "loop"
	serviceDesc, err := s.GetService(loopService)
	if err != nil {
		return err
	}
	payload := [constants.UserForwardPayloadLength]byte{}
	id := [cConstants.MessageIDLength]byte{}
David Stainton's avatar
David Stainton committed
135 136 137 138
	_, err = io.ReadFull(rand.Reader, id[:])
	if err != nil {
		return err
	}
139 140 141 142 143
	msg := &Message{
		ID:        &id,
		Recipient: serviceDesc.Name,
		Provider:  serviceDesc.Provider,
		Payload:   payload[:],
144 145
		WithSURB:  false,
		IsDecoy:   true,
146 147 148 149
	}
	return s.doSend(msg)
}

150
func (s *Session) composeMessage(recipient, provider string, message []byte, isBlocking bool) (*Message, error) {
151
	s.log.Debug("SendMessage")
152
	if len(message) > constants.UserForwardPayloadLength-4 {
153
		return nil, fmt.Errorf("invalid message size: %v", len(message))
154
	}
155
	payload := [constants.UserForwardPayloadLength]byte{}
156
	binary.BigEndian.PutUint32(payload[:4], uint32(len(message)))
157
	copy(payload[4:], message)
158
	id := [cConstants.MessageIDLength]byte{}
David Stainton's avatar
David Stainton committed
159 160 161 162
	_, err := io.ReadFull(rand.Reader, id[:])
	if err != nil {
		return nil, err
	}
163
	var msg = Message{
164 165 166 167 168 169 170
		ID:         &id,
		Recipient:  recipient,
		Provider:   provider,
		Payload:    payload[:],
		WithSURB:   true,
		SURBType:   cConstants.SurbTypeKaetzchen,
		IsBlocking: isBlocking,
171
	}
172 173
	return &msg, nil
}
174

175 176
// SendUnreliableMessage asynchronously sends message without any automatic retransmissions.
func (s *Session) SendUnreliableMessage(recipient, provider string, message []byte) (MessageID, error) {
177
	msg, err := s.composeMessage(recipient, provider, message, false)
178 179 180 181 182 183 184
	if err != nil {
		return nil, err
	}
	err = s.egressQueue.Push(msg)
	if err != nil {
		return nil, err
	}
185
	return msg.ID, nil
186
}
David Stainton's avatar
WIP  
David Stainton committed
187

188 189 190 191 192
func (s *Session) BlockingSendUnreliableMessage(recipient, provider string, message []byte) ([]byte, error) {
	msg, err := s.composeMessage(recipient, provider, message, true)
	if err != nil {
		return nil, err
	}
193
	sentWaitChan := make(chan *Message)
194 195 196 197 198
	s.sentWaitChanMap.Store(*msg.ID, sentWaitChan)

	replyWaitChan := make(chan []byte)
	s.replyWaitChanMap.Store(*msg.ID, replyWaitChan)

199 200 201 202 203
	err = s.egressQueue.Push(msg)
	if err != nil {
		return nil, err
	}
	// wait until sent so that we know the ReplyETA for the waiting below
David Stainton's avatar
David Stainton committed
204
	sentMessage := <-sentWaitChan
205
	s.sentWaitChanMap.Delete(*msg.ID)
206 207 208
	// wait for reply or round trip timeout
	select {
	case reply := <-replyWaitChan:
209
		s.replyWaitChanMap.Delete(*msg.ID)
210 211 212 213 214
		return reply, nil
	case <-time.After(sentMessage.ReplyETA + roundTripTimeSlop):
		return nil, ReplyTimeoutError
	}
	// unreachable
David Stainton's avatar
WIP  
David Stainton committed
215
}