diff --git a/dq/producer.go b/dq/producer.go index d2905b9..7e07dc9 100644 --- a/dq/producer.go +++ b/dq/producer.go @@ -19,12 +19,13 @@ const ( type ( Producer interface { - atWithWrapper(body []byte, at time.Time) (string, error) At(body []byte, at time.Time) (string, error) Close() error - delayWithWrapper(body []byte, delay time.Duration) (string, error) Delay(body []byte, delay time.Duration) (string, error) Revoke(ids string) error + + at(body []byte, at time.Time) (string, error) + delay(body []byte, delay time.Duration) (string, error) } producerCluster struct { @@ -57,9 +58,7 @@ func NewProducer(beanstalks []Beanstalk) Producer { func (p *producerCluster) At(body []byte, at time.Time) (string, error) { wrapped := wrap(body, at) - return p.insert(func(node Producer) (string, error) { - return node.atWithWrapper(wrapped, at) - }) + return p.at(wrapped, at) } func (p *producerCluster) Close() error { @@ -74,9 +73,7 @@ func (p *producerCluster) Close() error { func (p *producerCluster) Delay(body []byte, delay time.Duration) (string, error) { wrapped := wrap(body, time.Now().Add(delay)) - return p.insert(func(node Producer) (string, error) { - return node.delayWithWrapper(wrapped, delay) - }) + return p.delay(wrapped, delay) } func (p *producerCluster) Revoke(ids string) error { @@ -98,10 +95,22 @@ func (p *producerCluster) Revoke(ids string) error { return be.Err() } +func (p *producerCluster) at(body []byte, at time.Time) (string, error) { + return p.insert(func(node Producer) (string, error) { + return node.at(body, at) + }) +} + func (p *producerCluster) cloneNodes() []Producer { return append([]Producer(nil), p.nodes...) } +func (p *producerCluster) delay(body []byte, delay time.Duration) (string, error) { + return p.insert(func(node Producer) (string, error) { + return node.delay(body, delay) + }) +} + func (p *producerCluster) getWriteNodes() []Producer { if len(p.nodes) <= replicaNodes { return p.nodes @@ -156,15 +165,3 @@ func (p *producerCluster) insert(fn func(node Producer) (string, error)) (string return "", be.Err() } - -func (p *producerCluster) atWithWrapper(body []byte, at time.Time) (string, error) { - return p.insert(func(node Producer) (string, error) { - return node.atWithWrapper(body, at) - }) -} - -func (p *producerCluster) delayWithWrapper(body []byte, delay time.Duration) (string, error) { - return p.insert(func(node Producer) (string, error) { - return node.delayWithWrapper(body, delay) - }) -} diff --git a/dq/producernode.go b/dq/producernode.go index 0ebd30c..2d32634 100644 --- a/dq/producernode.go +++ b/dq/producernode.go @@ -28,28 +28,56 @@ func NewProducerNode(endpoint, tube string) Producer { } func (p *producerNode) At(body []byte, at time.Time) (string, error) { - return p.atWithWrapper(wrap(body, at), at) + return p.at(wrap(body, at), at) } -func (p *producerNode) atWithWrapper(body []byte, at time.Time) (string, error) { +func (p *producerNode) Close() error { + return p.conn.Close() +} + +func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) { + return p.delay(wrap(body, time.Now().Add(delay)), delay) +} + +func (p *producerNode) Revoke(jointId string) error { + ids := strings.Split(jointId, idSep) + for _, id := range ids { + fields := strings.Split(id, "/") + if len(fields) < 3 { + continue + } + if fields[0] != p.endpoint || fields[1] != p.tube { + continue + } + + conn, err := p.conn.get() + if err != nil { + return err + } + + n, err := strconv.ParseUint(fields[2], 10, 64) + if err != nil { + return err + } + + return conn.Delete(n) + } + + // if not in this beanstalk, ignore + return nil +} + +func (p *producerNode) at(body []byte, at time.Time) (string, error) { now := time.Now() if at.Before(now) { return "", ErrTimeBeforeNow } duration := at.Sub(now) - return p.delayWithWrapper(body, duration) + return p.delay(body, duration) } -func (p *producerNode) Close() error { - return p.conn.Close() -} - -func (p *producerNode) Delay(body []byte, delay time.Duration) (string, error) { - return p.delayWithWrapper(wrap(body, time.Now().Add(delay)), delay) -} - -func (p *producerNode) delayWithWrapper(body []byte, delay time.Duration) (string, error) { +func (p *producerNode) delay(body []byte, delay time.Duration) (string, error) { conn, err := p.conn.get() if err != nil { return "", err @@ -90,31 +118,3 @@ func (p *producerNode) delayWithWrapper(body []byte, delay time.Duration) (strin return "", err } - -func (p *producerNode) Revoke(jointId string) error { - ids := strings.Split(jointId, idSep) - for _, id := range ids { - fields := strings.Split(id, "/") - if len(fields) < 3 { - continue - } - if fields[0] != p.endpoint || fields[1] != p.tube { - continue - } - - conn, err := p.conn.get() - if err != nil { - return err - } - - n, err := strconv.ParseUint(fields[2], 10, 64) - if err != nil { - return err - } - - return conn.Delete(n) - } - - // if not in this beanstalk, ignore - return nil -}