plugin.go 源碼閱讀


package pingo

import (
    "bufio"
    "errors"
    "fmt"
    "io"
    "log"
    "net"
    "net/rpc"
    "os"
    "os/exec"
    "strings"
    "time"
)

var (
    errInvalidMessage      = ErrInvalidMessage(errors.New("Invalid ready message"))
    errRegistrationTimeout = ErrRegistrationTimeout(errors.New("Registration timed out"))
)

// Represents a plugin. After being created the plugin is not started or ready to run.
//
// Additional configuration (ErrorHandler and Timeout) can be set after initialization.
//
// Use Start() to make the plugin available.
type Plugin struct {
    exe         string
    proto       string
    unixdir     string
    params      []string
    initTimeout time.Duration
    exitTimeout time.Duration
    handler     ErrorHandler
    running     bool
    meta        meta
    objsCh      chan *objects
    connCh      chan *conn
    killCh      chan *waiter
    exitCh      chan struct{}
}

// NewPlugin create a new plugin ready to be started, or returns an error if the initial setup fails.
//
// The first argument specifies the protocol. It can be either set to "unix" for communication on an
// ephemeral local socket, or "tcp" for network communication on the local host (using a random
// unprivileged port.)
//
// This constructor will panic if the proto argument is neither "unix" nor "tcp".
//
// The path to the plugin executable should be absolute. Any path accepted by the "exec" package in the
// standard library is accepted and the same rules for execution are applied.
//
// Optionally some parameters might be passed to the plugin executable.
func NewPlugin(proto, path string, params ...string) *Plugin {
    if proto != "unix" && proto != "tcp" {
        panic("Invalid protocol. Specify 'unix' or 'tcp'.")
    }
    p := &Plugin{
        exe:         path,
        proto:       proto,
        params:      params,
        initTimeout: 2 * time.Second,
        exitTimeout: 2 * time.Second,
        handler:     NewDefaultErrorHandler(),
        meta:        meta("pingo" + randstr(5)),
        objsCh:      make(chan *objects),
        connCh:      make(chan *conn),
        killCh:      make(chan *waiter),
        exitCh:      make(chan struct{}),
    }
    return p
}

// Set the error (and output) handler implementation.  Use this to set a custom implementation.
// By default, standard logging is used.  See ErrorHandler.
//
// Panics if called after Start.
func (p *Plugin) SetErrorHandler(h ErrorHandler) {
    if p.running {
        panic("Cannot call SetErrorHandler after Start")
    }
    p.handler = h
}

// Set the maximum time a plugin is allowed to start up and to shut down.  Empty timeout (zero)
// is not allowed, default will be used.
//
// Default is two seconds.
//
// Panics if called after Start.
func (p *Plugin) SetTimeout(t time.Duration) {
    if p.running {
        panic("Cannot call SetTimeout after Start")
    }
    if t == 0 {
        return
    }
    p.initTimeout = t
    p.exitTimeout = t
}

func (p *Plugin) SetSocketDirectory(dir string) {
    if p.running {
        panic("Cannot call SetSocketDirectory after Start")
    }
    p.unixdir = dir
}

// Default string representation
func (p *Plugin) String() string {
    return fmt.Sprintf("%s %s", p.exe, strings.Join(p.params, " "))
}

// Start will execute the plugin as a subprocess. Start will return immediately. Any first call to the
// plugin will reveal eventual errors occurred at initialization.
//
// Calls subsequent to Start will hang until the plugin has been properly initialized.
func (p *Plugin) Start() {
    p.running = true
    go p.run()
}

// Stop attemps to stop cleanly or kill the running plugin, then will free all resources.
// Stop returns when the plugin as been shut down and related routines have exited.
func (p *Plugin) Stop() {
    wr := newWaiter()
    p.killCh <- wr
    wr.wait()
    p.exitCh <- struct{}{}
}

// Call performs an RPC call to the plugin. Prior to calling Call, the plugin must have been
// initialized by calling Start.
//
// Call will hang until a plugin has been initialized; it will return any error that happens
// either when performing the call or during plugin initialization via Start.
//
// Please refer to the "rpc" package from the standard library for more information on the
// semantics of this function.
func (p *Plugin) Call(name string, args interface{}, resp interface{}) error {
    conn := &conn{wr: newWaiter()}
    p.connCh <- conn
    conn.wr.wait()

    if conn.err != nil {
        return conn.err
    }

    return conn.client.Call(name, args, resp)
}

// Objects returns a list of the exported objects from the plugin. Exported objects used
// internally are not reported.
//
// Like Call, Objects returns any error happened on initialization if called after Start.
func (p *Plugin) Objects() ([]string, error) {
    objects := &objects{wr: newWaiter()}
    p.objsCh <- objects
    objects.wr.wait()

    return objects.list, objects.err
}

// ErrorHandler is the interface used by Plugin to report non-fatal errors and any other
// output from the plugin.
//
// A default implementation is provided and used if none is specified on plugin creation.
type ErrorHandler interface {
    // Error is called whenever a non-fatal error occurs in the plugin subprocess.
    Error(error)
    // Print is called for each line of output received from the plugin subprocess.
    Print(interface{})
}

// Default error handler implementation. Uses the default logging facility from the
// Go standard library.
type DefaultErrorHandler struct{}

// Constructor for default error handler.
func NewDefaultErrorHandler() *DefaultErrorHandler {
    return &DefaultErrorHandler{}
}

// Log via default standard library facility prepending the "error: " string.
func (e *DefaultErrorHandler) Error(err error) {
    log.Print("error: ", err)
}

// Log via default standard library facility.
func (e *DefaultErrorHandler) Print(s interface{}) {
    log.Print(s)
}

const internalObject = "PingoRpc"

type conn struct {
    client *rpc.Client
    err    error
    wr     *waiter
}

type waiter struct {
    c chan struct{}
}

func newWaiter() *waiter {
    return &waiter{c: make(chan struct{})}
}

func (wr *waiter) wait() {
    <-wr.c
}

func (wr *waiter) done() {
    close(wr.c)
}

func (wr *waiter) reset() {
    wr.c = make(chan struct{})
}

type client struct {
    *rpc.Client
    secret string
}

func newClient(s string, conn io.ReadWriteCloser) *client {
    return &client{secret: s, Client: rpc.NewClient(conn)}
}

func (c *client) authenticate(w io.Writer) error {
    _, err := io.WriteString(w, "Auth-Token: "+c.secret+"\n\n")
    return err
}

func dialAuthRpc(secret, network, address string, timeout time.Duration) (*rpc.Client, error) {
    conn, err := net.DialTimeout(network, address, timeout)
    if err != nil {
        return nil, err
    }
    c := newClient(secret, conn)
    if err := c.authenticate(conn); err != nil {
        return nil, err
    }
    return c.Client, nil
}

type objects struct {
    list []string
    err  error
    wr   *waiter
}

type ctrl struct {
    p    *Plugin
    objs []string
    // Protocol and address for RPC
    proto, addr string
    // Secret needed to connect to server
    secret string
    // Unrecoverable error is used as response to calls after it happened.
    err error
    // This channel is an alias to p.connCh. It allows to
    // intermittedly process calls (only when we can handle them).
    connCh chan *conn
    // Same as above, but for objects requests
    objsCh chan *objects
    // Timeout on plugin startup time
    timeoutCh <-chan time.Time
    // Get notification from Wait on the subprocess
    waitCh chan error
    // Get output lines from subprocess
    linesCh chan string
    // Respond to a routine waiting for this mail loop to exit.
    over *waiter
    // Executable
    proc *os.Process
    // RPC client to subprocess
    client *rpc.Client
}

func newCtrl(p *Plugin, t time.Duration) *ctrl {
    return &ctrl{
        p:         p,
        timeoutCh: time.After(t),
        linesCh:   make(chan string),
        waitCh:    make(chan error),
    }
}

func (c *ctrl) fatal(err error) {
    c.err = err
    c.open()
    c.kill()
}

func (c *ctrl) isFatal() bool {
    return c.err != nil
}

func (c *ctrl) close() {
    c.connCh = nil
    c.objsCh = nil
}

func (c *ctrl) open() {
    c.connCh = c.p.connCh
    c.objsCh = c.p.objsCh
}

func (c *ctrl) ready(val string) bool {
    var err error

    if err := c.parseReady(val); err != nil {
        c.fatal(err)
        return false
    }

    c.client, err = dialAuthRpc(c.secret, c.proto, c.addr, c.p.initTimeout)
    if err != nil {
        c.fatal(err)
        return false
    }

    // Remove the temp socket now that we are connected
    if c.proto == "unix" {
        if err := os.Remove(c.addr); err != nil {
            c.p.handler.Error(errors.New("Cannot remove temporary socket: " + err.Error()))
        }
    }

    // Defuse the timeout on ready
    c.timeoutCh = nil

    return true
}

func (c *ctrl) readOutput(r io.Reader) {
    scanner := bufio.NewScanner(r)

    for scanner.Scan() {
        c.linesCh <- scanner.Text()
    }
}

func (c *ctrl) waitErr(pidCh chan<- int, err error) {
    close(pidCh)
    c.waitCh <- err
}

func (c *ctrl) wait(pidCh chan<- int, exe string, params ...string) {
    defer close(c.waitCh)

    cmd := exec.Command(exe, params...)

    stdout, err := cmd.StdoutPipe()
    if err != nil {
        c.waitErr(pidCh, err)
        return
    }
    stderr, err := cmd.StderrPipe()
    if err != nil {
        c.waitErr(pidCh, err)
        return
    }
    if err := cmd.Start(); err != nil {
        c.waitErr(pidCh, err)
        return
    }

    pidCh <- cmd.Process.Pid
    close(pidCh)

    c.readOutput(stdout)
    c.readOutput(stderr)

    c.waitCh <- cmd.Wait()
}

func (c *ctrl) kill() {
    if c.proc == nil {
        return
    }
    // Ignore errors here because Kill might have been called after
    // process has ended.
    c.proc.Kill()
    c.proc = nil
}

func (c *ctrl) parseReady(str string) error {
    if !strings.HasPrefix(str, "proto=") {
        return errInvalidMessage
    }
    str = str[6:]
    s := strings.IndexByte(str, ' ')
    if s < 0 {
        return errInvalidMessage
    }
    proto := str[0:s]
    if proto != "unix" && proto != "tcp" {
        return errInvalidMessage
    }
    c.proto = proto

    str = str[s+1:]
    if !strings.HasPrefix(str, "addr=") {
        return errInvalidMessage
    }
    c.addr = str[5:]

    return nil
}

// Copy the list of objects for the requestor
func (c *ctrl) objects() []string {
    list := make([]string, len(c.objs)-1)
    for i, j := 0, 0; i < len(c.objs); i++ {
        if c.objs[i] == internalObject {
            continue
        }
        list[j] = c.objs[i]
        j = j + 1
    }
    return list
}

func (p *Plugin) run() {
    if p.unixdir == "" {
        p.unixdir = os.TempDir()
    }

    params := []string{
        "-pingo:prefix=" + string(p.meta),
        "-pingo:proto=" + p.proto,
    }
    if p.proto == "unix" && p.unixdir != "" {
        params = append(params, "-pingo:unixdir="+p.unixdir)
    }
    for i := 0; i < len(p.params); i++ {
        params = append(params, p.params[i])
    }

    c := newCtrl(p, p.initTimeout)

    pidCh := make(chan int)
    go c.wait(pidCh, p.exe, params...)
    pid := <-pidCh

    if pid != 0 {
        if proc, err := os.FindProcess(pid); err == nil {
            c.proc = proc
        }
    }

    for {
        select {
        case <-c.timeoutCh:
            c.fatal(errRegistrationTimeout)
        case r := <-c.connCh:
            if c.isFatal() {
                r.err = c.err
                r.wr.done()
                continue
            }

            r.client = c.client
            r.wr.done()
        case o := <-c.objsCh:
            if c.isFatal() {
                o.err = c.err
                o.wr.done()
                continue
            }

            o.list = c.objects()
            o.wr.done()
        case line := <-c.linesCh:
            key, val := p.meta.parse(line)
            switch key {
            case "auth-token":
                c.secret = val
            case "fatal":
                if err := parseError(val); err != nil {
                    c.fatal(err)
                } else {
                    c.fatal(errors.New(val))
                }
            case "error":
                if err := parseError(val); err != nil {
                    p.handler.Print(err)
                } else {
                    p.handler.Print(errors.New(val))
                }
            case "objects":
                c.objs = strings.Split(val, ", ")
            case "ready":
                if !c.ready(val) {
                    continue
                }
                // Start accepting calls
                c.open()
            default:
                p.handler.Print(line)
            }
        case wr := <-p.killCh:
            if c.waitCh == nil {
                wr.done()
                continue
            }

            // If we don't accept calls, kill immediately
            if c.connCh == nil || c.client == nil {
                c.kill()
            } else {
                // Be sure to kill the process if it doesn't obey Exit.
                go func(pid int, t time.Duration) {
                    <-time.After(t)

                    if proc, err := os.FindProcess(pid); err == nil {
                        proc.Kill()
                    }
                }(pid, p.exitTimeout)

                c.client.Call(internalObject+".Exit", 0, nil)
            }

            if c.client != nil {
                c.client.Close()
            }

            // Do not accept calls
            c.close()

            // When wait on the subprocess is exited, signal back via "over"
            c.over = wr
        case err := <-c.waitCh:
            if err != nil {
                if _, ok := err.(*exec.ExitError); !ok {
                    p.handler.Error(err)
                }
                c.fatal(err)
            }

            // Signal to whoever killed us (via killCh) that we are done
            if c.over != nil {
                c.over.done()
            }

            c.proc = nil
            c.waitCh = nil
            c.linesCh = nil
        case <-p.exitCh:
            return
        }
    }
}
相關文章
相關標籤/搜索