本文主要研究一下storagetapper的pipegit
storagetapper/pipe/pipe.gogithub
type Pipe interface { NewConsumer(topic string) (Consumer, error) NewProducer(topic string) (Producer, error) Type() string Config() *config.PipeConfig Close() error }
Pipe接口定義了NewConsumer、NewProducer、Type、Config、Close方法
storagetapper/pipe/pipe.gosql
type Consumer interface { Close() error //CloseOnFailure doesn't save offsets CloseOnFailure() error Message() chan interface{} Error() chan error FetchNext() (interface{}, error) //Allows to explicitly persists current consumer position SaveOffset() error //SetFormat allow to tell consumer the format of the file when there is no //header SetFormat(format string) }
Consumer接口定義了Close、CloseOnFailure、Message、Error、FetchNext、SaveOffset、SetFormat方法
storagetapper/pipe/pipe.goapp
type Producer interface { Push(data interface{}) error PushK(key string, data interface{}) error PushSchema(key string, data []byte) error //PushBatch queues the messages instead of sending immediately PushBatch(key string, data interface{}) error //PushCommit writes out all the messages queued by PushBatch PushBatchCommit() error Close() error CloseOnFailure() error SetFormat(format string) PartitionKey(source string, key string) string }
Producer接口定義了Push、PushK、PushSchema、PushBatch、PushBatchCommit、Close、CloseOnFailure、SetFormat、PartitionKey
storagetapper/pipe/pipe.gothis
func Create(pipeType string, cfg *config.PipeConfig, db *sql.DB) (Pipe, error) { init := Pipes[strings.ToLower(pipeType)] if init == nil { return nil, fmt.Errorf("unsupported pipe: %s", strings.ToLower(pipeType)) } pipe, err := init(cfg, db) if err != nil { return nil, err } return pipe, nil } type constructor func(cfg *config.PipeConfig, db *sql.DB) (Pipe, error) //Pipes is the list of registered pipes //Plugins insert their constructors into this map var Pipes map[string]constructor //registerPlugin should be called from plugin's init func registerPlugin(name string, init constructor) { if Pipes == nil { Pipes = make(map[string]constructor) } Pipes[name] = init }
Create方法根據pipeType、PipeConfig、db來建立pipe
storagetapper的Pipe接口定義了NewConsumer、NewProducer、Type、Config、Close方法,其Create方法根據pipeType、PipeConfig、db來建立pipe。code