Skip to content

Commit

Permalink
Merge pull request #31 from cloudstruct/feature/server-handshake
Browse files Browse the repository at this point in the history
Basic server support
  • Loading branch information
agaffney authored Mar 3, 2022
2 parents 2c422e8 + 7163ce0 commit 88740b0
Show file tree
Hide file tree
Showing 13 changed files with 298 additions and 107 deletions.
27 changes: 26 additions & 1 deletion cmd/go-ouroboros-network/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func buildBlockFetchCallbackConfig() *blockfetch.BlockFetchCallbackConfig {
}
}

func testChainSync(o *ouroboros.Ouroboros, f *globalFlags) {
func testChainSync(f *globalFlags) {
chainSyncFlags := newChainSyncFlags()
err := chainSyncFlags.flagset.Parse(f.flagset.Args()[1:])
if err != nil {
Expand All @@ -93,6 +93,31 @@ func testChainSync(o *ouroboros.Ouroboros, f *globalFlags) {
fmt.Printf("ERROR: unknown era '%s' specified as chain-sync start point\n", chainSyncFlags.startEra)
os.Exit(1)
}

conn := createClientConnection(f)
errorChan := make(chan error)
oOpts := &ouroboros.OuroborosOptions{
Conn: conn,
NetworkMagic: uint32(f.networkMagic),
ErrorChan: errorChan,
UseNodeToNodeProtocol: f.ntnProto,
SendKeepAlives: true,
ChainSyncCallbackConfig: buildChainSyncCallbackConfig(),
BlockFetchCallbackConfig: buildBlockFetchCallbackConfig(),
}
go func() {
for {
err := <-errorChan
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}
}()
o, err := ouroboros.New(oOpts)
if err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}

syncState.oConn = o
syncState.readyForNextBlockChan = make(chan bool)
syncState.nodeToNode = f.ntnProto
Expand Down
25 changes: 24 additions & 1 deletion cmd/go-ouroboros-network/localtxsubmission.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func buildLocalTxSubmissionCallbackConfig() *localtxsubmission.CallbackConfig {
}
}

func testLocalTxSubmission(o *ouroboros.Ouroboros, f *globalFlags) {
func testLocalTxSubmission(f *globalFlags) {
localTxSubmissionFlags := newLocalTxSubmissionFlags()
err := localTxSubmissionFlags.flagset.Parse(f.flagset.Args()[1:])
if err != nil {
Expand All @@ -48,6 +48,29 @@ func testLocalTxSubmission(o *ouroboros.Ouroboros, f *globalFlags) {

localTxSubmitState.submitResponse = make(chan bool)

conn := createClientConnection(f)
errorChan := make(chan error)
oOpts := &ouroboros.OuroborosOptions{
Conn: conn,
NetworkMagic: uint32(f.networkMagic),
ErrorChan: errorChan,
UseNodeToNodeProtocol: f.ntnProto,
SendKeepAlives: true,
LocalTxSubmissionCallbackConfig: buildLocalTxSubmissionCallbackConfig(),
}
go func() {
for {
err := <-errorChan
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}
}()
o, err := ouroboros.New(oOpts)
if err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}

txData, err := ioutil.ReadFile(localTxSubmissionFlags.txFile)
if err != nil {
fmt.Printf("Failed to load transaction file: %s\n", err)
Expand Down
83 changes: 33 additions & 50 deletions cmd/go-ouroboros-network/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"crypto/tls"
"flag"
"fmt"
ouroboros "github.com/cloudstruct/go-ouroboros-network"
"io"
"net"
"os"
)
Expand Down Expand Up @@ -50,29 +48,6 @@ func main() {
os.Exit(1)
}

var conn io.ReadWriteCloser
var dialProto string
var dialAddress string
if f.socket != "" {
dialProto = "unix"
dialAddress = f.socket
} else if f.address != "" {
dialProto = "tcp"
dialAddress = f.address
} else {
fmt.Printf("You must specify one of -socket or -address\n\n")
flag.PrintDefaults()
os.Exit(1)
}
if f.useTls {
conn, err = tls.Dial(dialProto, dialAddress, nil)
} else {
conn, err = net.Dial(dialProto, dialAddress)
}
if err != nil {
fmt.Printf("Connection failed: %s\n", err)
os.Exit(1)
}
if f.networkMagic == 0 {
if f.testnet {
f.networkMagic = TESTNET_MAGIC
Expand All @@ -84,35 +59,15 @@ func main() {
os.Exit(1)
}
}
errorChan := make(chan error, 10)
oOpts := &ouroboros.OuroborosOptions{
Conn: conn,
NetworkMagic: uint32(f.networkMagic),
ErrorChan: errorChan,
UseNodeToNodeProtocol: f.ntnProto,
SendKeepAlives: true,
ChainSyncCallbackConfig: buildChainSyncCallbackConfig(),
BlockFetchCallbackConfig: buildBlockFetchCallbackConfig(),
LocalTxSubmissionCallbackConfig: buildLocalTxSubmissionCallbackConfig(),
}
go func() {
for {
err := <-errorChan
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}
}()
o, err := ouroboros.New(oOpts)
if err != nil {
fmt.Printf("ERROR: %s\n", err)
os.Exit(1)
}

if len(f.flagset.Args()) > 0 {
switch f.flagset.Arg(0) {
case "chain-sync":
testChainSync(o, f)
testChainSync(f)
case "local-tx-submission":
testLocalTxSubmission(o, f)
testLocalTxSubmission(f)
case "server":
testServer(f)
default:
fmt.Printf("Unknown subcommand: %s\n", f.flagset.Arg(0))
os.Exit(1)
Expand All @@ -122,3 +77,31 @@ func main() {
os.Exit(1)
}
}

func createClientConnection(f *globalFlags) net.Conn {
var err error
var conn net.Conn
var dialProto string
var dialAddress string
if f.socket != "" {
dialProto = "unix"
dialAddress = f.socket
} else if f.address != "" {
dialProto = "tcp"
dialAddress = f.address
} else {
fmt.Printf("You must specify one of -socket or -address\n\n")
flag.PrintDefaults()
os.Exit(1)
}
if f.useTls {
conn, err = tls.Dial(dialProto, dialAddress, nil)
} else {
conn, err = net.Dial(dialProto, dialAddress)
}
if err != nil {
fmt.Printf("Connection failed: %s\n", err)
os.Exit(1)
}
return conn
}
83 changes: 83 additions & 0 deletions cmd/go-ouroboros-network/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package main

import (
"flag"
"fmt"
ouroboros "github.com/cloudstruct/go-ouroboros-network"
"net"
"os"
)

type serverFlags struct {
flagset *flag.FlagSet
//txFile string
}

func newServerFlags() *serverFlags {
f := &serverFlags{
flagset: flag.NewFlagSet("server", flag.ExitOnError),
}
//f.flagset.StringVar(&f.txFile, "tx-file", "", "path to the transaction file to submit")
return f
}

func createListenerSocket(f *globalFlags) (net.Listener, error) {
var err error
var listen net.Listener
if f.socket != "" {
if err := os.RemoveAll(f.socket); err != nil {
return nil, fmt.Errorf("failed to remove existing socket: %s", err)
}
listen, err = net.Listen("unix", f.socket)
if err != nil {
return nil, fmt.Errorf("failed to open listening socket: %s", err)
}
} else if f.address != "" {
listen, err = net.Listen("tcp", f.address)
if err != nil {
return nil, fmt.Errorf("failed to open listening socket: %s", err)
}
}
return listen, nil
}

func testServer(f *globalFlags) {
serverFlags := newServerFlags()
err := serverFlags.flagset.Parse(f.flagset.Args()[1:])
if err != nil {
fmt.Printf("failed to parse subcommand args: %s\n", err)
os.Exit(1)
}

listen, err := createListenerSocket(f)
if err != nil {
fmt.Printf("ERROR: failed to create listener: %s\n", err)
os.Exit(1)
}

for {
conn, err := listen.Accept()
if err != nil {
fmt.Printf("ERROR: failed to accept connection: %s\n", err)
continue
}
errorChan := make(chan error)
oOpts := &ouroboros.OuroborosOptions{
Conn: conn,
NetworkMagic: uint32(f.networkMagic),
ErrorChan: errorChan,
UseNodeToNodeProtocol: f.ntnProto,
Server: true,
}
go func() {
for {
err := <-errorChan
fmt.Printf("ERROR: %s\n", err)
}
}()
_, err = ouroboros.New(oOpts)
if err != nil {
fmt.Printf("ERROR: %s\n", err)
}
}
}
48 changes: 29 additions & 19 deletions ouroboros.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,19 @@ package ouroboros

import (
"github.com/cloudstruct/go-ouroboros-network/muxer"
"github.com/cloudstruct/go-ouroboros-network/protocol"
"github.com/cloudstruct/go-ouroboros-network/protocol/blockfetch"
"github.com/cloudstruct/go-ouroboros-network/protocol/chainsync"
"github.com/cloudstruct/go-ouroboros-network/protocol/handshake"
"github.com/cloudstruct/go-ouroboros-network/protocol/keepalive"
"github.com/cloudstruct/go-ouroboros-network/protocol/localtxsubmission"
"io"
"net"
)

type Ouroboros struct {
conn io.ReadWriteCloser
conn net.Conn
networkMagic uint32
waitForHandshake bool
server bool
useNodeToNodeProto bool
handshakeComplete bool
muxer *muxer.Muxer
Expand All @@ -33,12 +33,10 @@ type Ouroboros struct {
}

type OuroborosOptions struct {
Conn io.ReadWriteCloser
NetworkMagic uint32
ErrorChan chan error
// Whether to wait for the other side to initiate the handshake. This is useful
// for servers
WaitForHandshake bool
Conn net.Conn
NetworkMagic uint32
ErrorChan chan error
Server bool
UseNodeToNodeProtocol bool
SendKeepAlives bool
ChainSyncCallbackConfig *chainsync.ChainSyncCallbackConfig
Expand All @@ -51,7 +49,7 @@ func New(options *OuroborosOptions) (*Ouroboros, error) {
o := &Ouroboros{
conn: options.Conn,
networkMagic: options.NetworkMagic,
waitForHandshake: options.WaitForHandshake,
server: options.Server,
useNodeToNodeProto: options.UseNodeToNodeProtocol,
chainSyncCallbackConfig: options.ChainSyncCallbackConfig,
blockFetchCallbackConfig: options.BlockFetchCallbackConfig,
Expand Down Expand Up @@ -92,37 +90,49 @@ func (o *Ouroboros) setupConnection() error {
err := <-o.muxer.ErrorChan
o.ErrorChan <- err
}()
// Perform handshake
o.Handshake = handshake.New(o.muxer, o.ErrorChan, o.useNodeToNodeProto)
protoOptions := protocol.ProtocolOptions{
Muxer: o.muxer,
ErrorChan: o.ErrorChan,
}
var protoVersions []uint16
if o.useNodeToNodeProto {
protoVersions = GetProtocolVersionsNtN()
protoOptions.Mode = protocol.ProtocolModeNodeToNode
} else {
protoVersions = GetProtocolVersionsNtC()
protoOptions.Mode = protocol.ProtocolModeNodeToClient
}
if o.server {
protoOptions.Role = protocol.ProtocolRoleServer
} else {
protoOptions.Role = protocol.ProtocolRoleClient
}
// Perform handshake
o.Handshake = handshake.New(protoOptions, protoVersions)
// TODO: figure out better way to signify automatic handshaking and returning the chosen version
if !o.waitForHandshake {
if !o.server {
err := o.Handshake.ProposeVersions(protoVersions, o.networkMagic)
if err != nil {
return err
}
}
o.handshakeComplete = <-o.Handshake.Finished
// TODO: register additional mini-protocols
if o.useNodeToNodeProto {
versionNtN := GetProtocolVersionNtN(o.Handshake.Version)
o.ChainSync = chainsync.New(o.muxer, o.ErrorChan, o.useNodeToNodeProto, o.chainSyncCallbackConfig)
o.BlockFetch = blockfetch.New(o.muxer, o.ErrorChan, o.blockFetchCallbackConfig)
protoOptions.Mode = protocol.ProtocolModeNodeToNode
o.ChainSync = chainsync.New(protoOptions, o.chainSyncCallbackConfig)
o.BlockFetch = blockfetch.New(protoOptions, o.blockFetchCallbackConfig)
if versionNtN.EnableKeepAliveProtocol {
o.KeepAlive = keepalive.New(o.muxer, o.ErrorChan, o.keepAliveCallbackConfig)
o.KeepAlive = keepalive.New(protoOptions, o.keepAliveCallbackConfig)
if o.sendKeepAlives {
o.KeepAlive.Start()
}
}
} else {
//versionNtC := GetProtocolVersionNtC(o.Handshake.Version)
o.ChainSync = chainsync.New(o.muxer, o.ErrorChan, o.useNodeToNodeProto, o.chainSyncCallbackConfig)
o.LocalTxSubmission = localtxsubmission.New(o.muxer, o.ErrorChan, o.localTxSubmissionCallbackConfig)
protoOptions.Mode = protocol.ProtocolModeNodeToClient
o.ChainSync = chainsync.New(protoOptions, o.chainSyncCallbackConfig)
o.LocalTxSubmission = localtxsubmission.New(protoOptions, o.localTxSubmissionCallbackConfig)
}
return nil
}
Loading

0 comments on commit 88740b0

Please sign in to comment.