diff --git a/go.mod b/go.mod index 468a16f..1a07d9b 100644 --- a/go.mod +++ b/go.mod @@ -3,8 +3,14 @@ module go.neonxp.dev/jsonrpc2 go 1.18 require ( + github.com/gobwas/ws v1.1.0 github.com/qri-io/jsonschema v0.2.1 - golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 + golang.org/x/sync v0.1.0 ) -require github.com/qri-io/jsonpointer v0.1.1 // indirect +require ( + github.com/gobwas/httphead v0.1.0 // indirect + github.com/gobwas/pool v0.2.1 // indirect + github.com/qri-io/jsonpointer v0.1.1 // indirect + golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d // indirect +) diff --git a/go.sum b/go.sum index 28b48ac..2e0dc13 100644 --- a/go.sum +++ b/go.sum @@ -1,4 +1,10 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gobwas/httphead v0.1.0 h1:exrUm0f4YX0L7EBwZHuCF4GDp8aJfVeBrlLQrs6NqWU= +github.com/gobwas/httphead v0.1.0/go.mod h1:O/RXo79gxV8G+RqlR/otEwx4Q36zl9rqC5u12GKvMCM= +github.com/gobwas/pool v0.2.1 h1:xfeeEhW7pwmX8nuLVlqbzVc7udMDrwetjEv+TZIz1og= +github.com/gobwas/pool v0.2.1/go.mod h1:q8bcK0KcYlCgd9e7WYLm9LpyS+YeLd8JVDW6WezmKEw= +github.com/gobwas/ws v1.1.0 h1:7RFti/xnNkMJnrK7D1yQ/iCIB5OrrY/54/H930kIbHA= +github.com/gobwas/ws v1.1.0/go.mod h1:nzvNcVha5eUziGrbxFCo6qFIojQHjJV5cLYIbezhfL0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/qri-io/jsonpointer v0.1.1 h1:prVZBZLL6TW5vsSB9fFHFAMBLI4b0ri5vribQlTJiBA= github.com/qri-io/jsonpointer v0.1.1/go.mod h1:DnJPaYgiKu56EuDp8TU5wFLdZIcAnb/uH9v37ZaMV64= @@ -8,5 +14,7 @@ github.com/sergi/go-diff v1.0.0 h1:Kpca3qRNrduNnOQeazBd0ysaKrUJiIuISHxogkT9RPQ= github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= -golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d h1:MiWWjyhUzZ+jvhZvloX6ZrUsdEghn8a64Upd8EMHglE= +golang.org/x/sys v0.0.0-20201207223542-d4d67f95c62d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/rpc/options.go b/rpc/options.go index 683df66..7ffab95 100644 --- a/rpc/options.go +++ b/rpc/options.go @@ -42,3 +42,9 @@ func WithLogger(l Logger) Option { s.logger = l } } + +func WithMiddlewares(mws ...Middleware) Option { + return func(s *RpcServer) { + s.middlewares = append(s.middlewares, mws...) + } +} diff --git a/transport/websockets.go b/transport/websockets.go new file mode 100644 index 0000000..e319d8e --- /dev/null +++ b/transport/websockets.go @@ -0,0 +1,100 @@ +//Package rpc provides abstract rpc server +// +//Copyright (C) 2022 Alexander Kiryukhin +// +//This file is part of go.neonxp.dev/jsonrpc2 project. +// +//This program is free software: you can redistribute it and/or modify +//it under the terms of the GNU General Public License as published by +//the Free Software Foundation, either version 3 of the License, or +//(at your option) any later version. +// +//This program is distributed in the hope that it will be useful, +//but WITHOUT ANY WARRANTY; without even the implied warranty of +//MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +//GNU General Public License for more details. +// +//You should have received a copy of the GNU General Public License +//along with this program. If not, see . + +package transport + +import ( + "context" + "crypto/tls" + "net" + "net/http" + "time" + + websocket "github.com/gobwas/ws" + "github.com/gobwas/ws/wsutil" +) + +type WebSocket struct { + Bind string + TLS *tls.Config + CORSOrigin string + Parallel bool + ReadDeadline, WriteDeadline time.Duration //Set custom timeout for future read and write calls +} + +func (ws *WebSocket) WithReadDealine() bool { return ws.ReadDeadline != 0 } +func (ws *WebSocket) WithWriteDealine() bool { return ws.WriteDeadline != 0 } + +func (ws *WebSocket) Run(ctx context.Context, resolver Resolver) error { + srv := http.Server{ + Addr: ws.Bind, + Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + wsconn, _, _, err := websocket.UpgradeHTTP(r, w) + if err != nil { + return + } + + defer func() { + wsconn.Close() + }() + + if ws.WithReadDealine() { + wsconn.SetReadDeadline(time.Now().Add(ws.ReadDeadline * time.Second)) + } + + if ws.WithWriteDealine() { + wsconn.SetWriteDeadline(time.Now().Add(ws.WriteDeadline * time.Second)) + } + + for { + + // read message from connection + _, reader, err := wsutil.NextReader(wsconn, websocket.StateServerSide) + if err != nil { + return + } + + // create writer object that implements io.WriterCloser interface + writer := wsutil.NewWriter(wsconn, websocket.StateServerSide, websocket.OpText) + + resolver.Resolve(ctx, reader, writer, ws.Parallel) + + if err := writer.Flush(); err != nil { + return + } + + } + + }), + + BaseContext: func(l net.Listener) context.Context { + return ctx + }, + } + + go func() { + <-ctx.Done() + srv.Close() + }() + + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + return err + } + return nil +}