Skip to content

Commit

Permalink
add CMuxPlugin
Browse files Browse the repository at this point in the history
  • Loading branch information
smallnest committed Apr 9, 2021
1 parent 39f2702 commit 7cece5b
Show file tree
Hide file tree
Showing 4 changed files with 32 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
- support DNS as service discovery
- support rpcx flow tracing
- support websocket as the transport like tcp,kcp and quic
- add CMuxPlugin to allow developing customzied services by using the same single port

## 6.0

Expand Down
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
<p style="font-size: 64px;color: red;">rpcx้‡ๆ–ฐๅปบ็ซ‹ไบ†tag,ๅฐ†tag็š„็‰ˆๆœฌๅทๆŽงๅˆถๅœจ <b>1.x.x</b> ไน‹ๅ†…๏ผŒ็œ‹[issue#567](https://github.com/smallnest/rpcx/issues/567)ๅฏนไฝ ็š„ๅฝฑๅ“</p>

**stable branch**: v1.6.2
**development branch**: master
- **stable branch**: v1.6.2
- **development branch**: master

<a href="https://rpcx.io/"><img height="160" src="http://rpcx.io/logos/rpcx-logo-text.png"></a>

Expand Down
9 changes: 7 additions & 2 deletions server/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@ func (s *Server) startGateway(network string, ln net.Listener) net.Listener {

rpcxLn := m.Match(rpcxPrefixByteMatcher())

// mux Plugins
if s.Plugins != nil {
s.Plugins.MuxMatch(m)
}

if !s.DisableJSONRPC {
jsonrpc2Ln := m.Match(cmux.HTTP1HeaderField("X-JSONRPC-2.0", "true"))
go s.startJSONRPC2(jsonrpc2Ln)
Expand Down Expand Up @@ -108,7 +113,7 @@ func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, pa
req, err := HTTPRequest2RpcxRequest(r)
defer protocol.FreeMsg(req)

//set headers
// set headers
wh.Set(XVersion, r.Header.Get(XVersion))
wh.Set(XMessageID, r.Header.Get(XMessageID))

Expand Down Expand Up @@ -175,7 +180,7 @@ func (s *Server) handleGatewayRequest(w http.ResponseWriter, r *http.Request, pa
}

s.Plugins.DoPreWriteResponse(newCtx, req, nil, nil)
if len(resMetadata) > 0 { //copy meta in context to request
if len(resMetadata) > 0 { // copy meta in context to request
meta := res.Metadata
if meta == nil {
res.Metadata = resMetadata
Expand Down
28 changes: 22 additions & 6 deletions server/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (

"github.com/smallnest/rpcx/errors"
"github.com/smallnest/rpcx/protocol"
"github.com/soheilhy/cmux"
)

//PluginContainer represents a plugin container that defines all methods to manage plugins.
//And it also defines all extension points.
// PluginContainer represents a plugin container that defines all methods to manage plugins.
// And it also defines all extension points.
type PluginContainer interface {
Add(plugin Plugin)
Remove(plugin Plugin)
Expand All @@ -36,10 +37,12 @@ type PluginContainer interface {
DoPostWriteRequest(ctx context.Context, r *protocol.Message, e error) error

DoHeartbeatRequest(ctx context.Context, req *protocol.Message) error

MuxMatch(m cmux.CMux)
}

// Plugin is the server plugin interface.
type Plugin interface {}
type Plugin interface{}

type (
// RegisterPlugin is .
Expand Down Expand Up @@ -112,6 +115,10 @@ type (
HeartbeatPlugin interface {
HeartbeatRequest(ctx context.Context, req *protocol.Message) error
}

CMuxPlugin interface {
MuxMatch(m cmux.CMux)
}
)

// pluginContainer implements PluginContainer interface.
Expand Down Expand Up @@ -198,13 +205,13 @@ func (p *pluginContainer) DoUnregister(name string) error {
return nil
}

//DoPostConnAccept handles accepted conn
// DoPostConnAccept handles accepted conn
func (p *pluginContainer) DoPostConnAccept(conn net.Conn) (net.Conn, bool) {
var flag bool
for i := range p.plugins {
if plugin, ok := p.plugins[i].(PostConnAcceptPlugin); ok {
conn, flag = plugin.HandleConnAccept(conn)
if !flag { //interrupt
if !flag { // interrupt
conn.Close()
return conn, false
}
Expand All @@ -213,7 +220,7 @@ func (p *pluginContainer) DoPostConnAccept(conn net.Conn) (net.Conn, bool) {
return conn, true
}

//DoPostConnClose handles closed conn
// DoPostConnClose handles closed conn
func (p *pluginContainer) DoPostConnClose(conn net.Conn) bool {
var flag bool
for i := range p.plugins {
Expand Down Expand Up @@ -368,3 +375,12 @@ func (p *pluginContainer) DoHeartbeatRequest(ctx context.Context, r *protocol.Me

return nil
}

// MuxMatch adds cmux Match.
func (p *pluginContainer) MuxMatch(m cmux.CMux) {
for i := range p.plugins {
if plugin, ok := p.plugins[i].(CMuxPlugin); ok {
plugin.MuxMatch(m)
}
}
}

0 comments on commit 7cece5b

Please sign in to comment.