Skip to content

Support read-only local mirror #288

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 22 commits into from
Mar 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cleanupDB.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,6 @@ cd ${PROJECT_DIR} && find . -name '*.db' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -name '*.db-shm' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -name '*.db-wal' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -name 'db.meta' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -name 'public.keystore' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -name '*.public.keystore' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -name 'public.keystore*' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -name '*.public.keystore*' -exec rm -vf {} \;
cd ${PROJECT_DIR} && find . -type d -name '*.ldb' -prune -exec rm -vrf {} \;
8 changes: 8 additions & 0 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
const (
paramUseLeader = "use_leader"
paramUseFollower = "use_follower"
paramMirror = "mirror"
)

// Config is a configuration parsed from a DSN string.
Expand All @@ -40,6 +41,9 @@ type Config struct {

// UseFollower use follower nodes to do queries
UseFollower bool

// Mirror option forces client to query from mirror server
Mirror string
}

// NewConfig creates a new config with default value.
Expand All @@ -64,6 +68,9 @@ func (cfg *Config) FormatDSN() string {
newQuery.Add(paramUseLeader, strconv.FormatBool(cfg.UseLeader))
}
}
if cfg.Mirror != "" {
newQuery.Add(paramMirror, cfg.Mirror)
}
u.RawQuery = newQuery.Encode()

return u.String()
Expand All @@ -90,6 +97,7 @@ func ParseDSN(dsn string) (cfg *Config, err error) {
if !cfg.UseLeader && !cfg.UseFollower {
cfg.UseLeader = true
}
cfg.Mirror = q.Get(paramMirror)

return cfg, nil
}
9 changes: 9 additions & 0 deletions client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,13 @@ func TestConfig(t *testing.T) {
UseFollower: true,
})
})

Convey("test format and parse dsn with mirror option", t, func() {
cfg, err := ParseDSN("covenantsql://db?mirror=happy")
So(err, ShouldBeNil)
So(cfg.Mirror, ShouldEqual, "happy")
So(cfg.FormatDSN(), ShouldEqual, "covenantsql://db?mirror=happy")
cfg.Mirror = ""
So(cfg.FormatDSN(), ShouldEqual, "covenantsql://db")
})
}
91 changes: 54 additions & 37 deletions client/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type conn struct {
type pconn struct {
parent *conn
ackCh chan *types.Ack
pCaller *rpc.PersistentCaller
pCaller rpc.PCaller
}

func newConn(cfg *Config) (c *conn, err error) {
Expand Down Expand Up @@ -84,39 +84,48 @@ func newConn(cfg *Config) (c *conn, err error) {
return nil, errors.WithMessage(err, "cacheGetPeers failed")
}

if cfg.UseLeader {
if cfg.Mirror != "" {
c.leader = &pconn{
parent: c,
pCaller: rpc.NewPersistentCaller(peers.Leader),
pCaller: rpc.NewRawCaller(cfg.Mirror),
}

// no ack workers required, mirror mode does not support ack worker
} else {
if cfg.UseLeader {
c.leader = &pconn{
parent: c,
pCaller: rpc.NewPersistentCaller(peers.Leader),
}
}
}

// choose a random follower node
if cfg.UseFollower && len(peers.Servers) > 1 {
for {
node := peers.Servers[randSource.Intn(len(peers.Servers))]
if node != peers.Leader {
c.follower = &pconn{
parent: c,
pCaller: rpc.NewPersistentCaller(node),
// choose a random follower node
if cfg.UseFollower && len(peers.Servers) > 1 {
for {
node := peers.Servers[randSource.Intn(len(peers.Servers))]
if node != peers.Leader {
c.follower = &pconn{
parent: c,
pCaller: rpc.NewPersistentCaller(node),
}
break
}
break
}
}
}

if c.leader == nil && c.follower == nil {
return nil, errors.New("no follower peers found")
}
if c.leader == nil && c.follower == nil {
return nil, errors.New("no follower peers found")
}

if c.leader != nil {
if err := c.leader.startAckWorkers(2); err != nil {
return nil, errors.WithMessage(err, "leader startAckWorkers failed")
if c.leader != nil {
if err := c.leader.startAckWorkers(2); err != nil {
return nil, errors.WithMessage(err, "leader startAckWorkers failed")
}
}
}
if c.follower != nil {
if err := c.follower.startAckWorkers(2); err != nil {
return nil, errors.WithMessage(err, "follower startAckWorkers failed")
if c.follower != nil {
if err := c.follower.startAckWorkers(2); err != nil {
return nil, errors.WithMessage(err, "follower startAckWorkers failed")
}
}
}

Expand All @@ -133,13 +142,19 @@ func (c *pconn) startAckWorkers(workerCount int) (err error) {
}

func (c *pconn) stopAckWorkers() {
close(c.ackCh)
if c.ackCh != nil {
select {
case <-c.ackCh:
default:
close(c.ackCh)
}
}
}

func (c *pconn) ackWorker() {
var (
oneTime sync.Once
pc *rpc.PersistentCaller
pc rpc.PCaller
err error
)

Expand All @@ -150,10 +165,10 @@ ackWorkerLoop:
break ackWorkerLoop
}
oneTime.Do(func() {
pc = rpc.NewPersistentCaller(c.pCaller.TargetID)
pc = c.pCaller.New()
})
if err = ack.Sign(c.parent.privKey); err != nil {
log.WithField("target", pc.TargetID).WithError(err).Error("failed to sign ack")
log.WithField("target", pc.Target()).WithError(err).Error("failed to sign ack")
continue
}

Expand Down Expand Up @@ -375,7 +390,7 @@ func (c *conn) sendQuery(ctx context.Context, queryType types.QueryType, queries
"type": queryType.String(),
"connID": connID,
"seqNo": seqNo,
"target": uc.pCaller.TargetID,
"target": uc.pCaller.Target(),
"source": c.localNodeID,
}).WithError(err).Debug("send query")
}()
Expand Down Expand Up @@ -415,15 +430,17 @@ func (c *conn) sendQuery(ctx context.Context, queryType types.QueryType, queries
// build ack
func() {
defer trace.StartRegion(ctx, "ackEnqueue").End()
uc.ackCh <- &types.Ack{
Header: types.SignedAckHeader{
AckHeader: types.AckHeader{
Response: response.Header.ResponseHeader,
ResponseHash: response.Header.Hash(),
NodeID: c.localNodeID,
Timestamp: getLocalTime(),
if uc.ackCh != nil {
uc.ackCh <- &types.Ack{
Header: types.SignedAckHeader{
AckHeader: types.AckHeader{
Response: response.Header.ResponseHeader,
ResponseHash: response.Header.Hash(),
NodeID: c.localNodeID,
Timestamp: getLocalTime(),
},
},
},
}
}
}()

Expand Down
31 changes: 31 additions & 0 deletions cmd/cql-minerd/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -596,6 +596,37 @@ func TestFullProcess(t *testing.T) {
err = db.Close()
So(err, ShouldBeNil)

// test query from follower node
dsnCfgMix := *dsnCfg
dsnCfgMix.UseLeader = true
dsnCfgMix.UseFollower = true
dbMix, err := sql.Open("covenantsql", dsnCfgMix.FormatDSN())
So(err, ShouldBeNil)
defer dbMix.Close()

result = 0
err = dbMix.QueryRow("SELECT * FROM test LIMIT 1").Scan(&result)
So(err, ShouldBeNil)
So(result, ShouldEqual, 4)

_, err = dbMix.Exec("INSERT INTO test VALUES(2)")
So(err, ShouldBeNil)

// test query from follower only
dsnCfgFollower := *dsnCfg
dsnCfgFollower.UseLeader = false
dsnCfgFollower.UseFollower = true
dbFollower, err := sql.Open("covenantsql", dsnCfgFollower.FormatDSN())
So(err, ShouldBeNil)
defer dbFollower.Close()

err = dbFollower.QueryRow("SELECT * FROM test LIMIT 1").Scan(&result)
So(err, ShouldBeNil)
So(result, ShouldEqual, 4)

_, err = dbFollower.Exec("INSERT INTO test VALUES(2)")
So(err, ShouldNotBeNil)

// TODO(lambda): Drop database
})
}
Expand Down
53 changes: 0 additions & 53 deletions cmd/cql-observer/node.go

This file was deleted.

15 changes: 7 additions & 8 deletions cmd/cql/internal/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,20 @@ package internal

import (
"context"
"net/http"
"time"

"github.com/CovenantSQL/CovenantSQL/sqlchain/adapter"
"github.com/CovenantSQL/CovenantSQL/utils"
)

var (
adapterAddr string // adapter listen addr

adapterHTTPServer *http.Server
adapterAddr string // adapter listen addr
adapterUseMirrorAddr string
)

// CmdAdapter is cql adapter command entity.
var CmdAdapter = &Command{
UsageLine: "cql adapter [-config file] [-tmp-path path] [-bg-log-level level] address",
UsageLine: "cql adapter [-config file] [-tmp-path path] [-bg-log-level level] [-mirror addr] address",
Short: "start a SQLChain adapter",
Long: `
Adapter command serves a SQLChain adapter
Expand All @@ -44,13 +42,14 @@ e.g.

func init() {
CmdAdapter.Run = runAdapter
CmdAdapter.Flag.StringVar(&adapterUseMirrorAddr, "mirror", "", "mirror server for adapter to query")

addCommonFlags(CmdAdapter)
addBgServerFlag(CmdAdapter)
}

func startAdapterServer(adapterAddr string) func() {
adapterHTTPServer, err := adapter.NewHTTPAdapter(adapterAddr, configFile)
func startAdapterServer(adapterAddr string, adapterUseMirrorAddr string) func() {
adapterHTTPServer, err := adapter.NewHTTPAdapter(adapterAddr, configFile, adapterUseMirrorAddr)
if err != nil {
ConsoleLog.WithError(err).Error("init adapter failed")
SetExitStatus(1)
Expand Down Expand Up @@ -84,7 +83,7 @@ func runAdapter(cmd *Command, args []string) {
}
adapterAddr = args[0]

cancelFunc := startAdapterServer(adapterAddr)
cancelFunc := startAdapterServer(adapterAddr, adapterUseMirrorAddr)
ExitIfErrors()
defer cancelFunc()

Expand Down
3 changes: 2 additions & 1 deletion cmd/cql/internal/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ func (c *Command) Runnable() bool {

var atExitFuncs []func()

func atExit(f func()) {
// AtExit will register function to be executed before exit.
func AtExit(f func()) {
atExitFuncs = append(atExitFuncs, f)
}

Expand Down
2 changes: 2 additions & 0 deletions cmd/cql/internal/cfg.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ func configInit() {
Exit()
}

ConsoleLog.Info("init config success")

// TODO(leventeliu): discover more specific confirmation duration from config. We don't have
// enough informations from config to do that currently, so just use a fixed and long enough
// duration.
Expand Down
2 changes: 1 addition & 1 deletion cmd/cql/internal/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ func runConsole(cmd *Command, args []string) {
}

if adapterAddr != "" {
cancelFunc := startAdapterServer(adapterAddr)
cancelFunc := startAdapterServer(adapterAddr, "")
defer cancelFunc()
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/cql/internal/drop.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func runDrop(cmd *Command, args []string) {

// drop database
if _, err := client.ParseDSN(dsn); err != nil {
// not a dsn
// not a dsn/dbid
ConsoleLog.WithField("db", dsn).WithError(err).Error("Not a valid dsn")
SetExitStatus(1)
return
Expand Down
Loading