NSQ Tutorial

I am looking for NSQ tutorial beyond what Traun Leyden and Guillaume Charmes have provided in their respective posts. In particular, I am looking for a way to decouple nsqd, nsqlookupd, nsq producer, nsq consumer.

After a few days wrestling with the documentation and a few Turnkey Linux virtual machines (using only VirtualBox, no Vagrant here), here is my eureka moment. NSQ assumes a fairly basic knowledge of Linux (just a couple of CLI) but thanks to Turnkey Linux, there is the great integration of Web Shell and Webmin.

Here is the verbatim copy from my GitHub repo minus the comments.

Decoupling nsqd, nsqlookupd, nsq producer, nsq consumer

On VirtualBox, run three Turnkey Linux VMs (using host-only adapter)

192.168.56.101 - nsqlookupd
192.168.56.102 - nsqd and nsq producer client
192.168.56.103 - nsq consumer client

192.168.56.101

nsqlookupd

192.168.56.102

--lookupd-tcp-address = IP address of nsqlookupd
--broadcast-address = IP address of machine where nsqd is running
nsqd --lookupd-tcp-address=192.168.56.101:4160 --broadcast-address=192.168.56.102

On the same node as nsqd is the nsq producer client

//NSQ Producer Client
package main
import (
    "flag"
    "fmt"
    "github.com/ibmendoza/go-lib"
    "github.com/nsqio/go-nsq"
    "log"
    "math/rand"
    "time"
)
var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!@#$%^&*()1234567890")
var numbPtr = flag.Int("msg", 10000, "number of messages (default: 10000)")
func randSeq(n int) string {
    b := make([]rune, n)
    for i := range b {
        b[i] = letters[rand.Intn(len(letters))]
    }
    return string(b)
}
func main() {
    config := nsq.NewConfig()
    ipaddr, _ := lib.GetIPAddress()
    w, err := nsq.NewProducer(ipaddr+":4150", config)
    if err != nil {
        log.Fatal("Could not connect")
    }
    flag.Parse()
    start := time.Now()
    for i := 1; i <= *numbPtr; i++ {
        w.Publish("test", []byte(randSeq(320)))
    }
    elapsed := time.Since(start)
    log.Printf("Time took %s", elapsed)
    w.Stop()
    fmt.Scanln()
}

NSQ Consumer Client

Test nsq consumer client at 192.168.56.103

//NSQ Consumer Client
package main
import (
    "flag"
    "fmt"
    "github.com/itmarketplace/go-queue"
    "github.com/nsqio/go-nsq"
    "log"
    "runtime"
    "sync/atomic"
    "time"
)
var start = time.Now()
var ops uint64 = 0
var numbPtr = flag.Int("msg", 10000, "number of messages (default: 10000)")
var lkp = flag.String("lkp", "", "IP address of nsqlookupd")
func main() {
    flag.Parse()
    c := queue.NewConsumer("test", "ch")
    c.Set("nsqlookupd", *lkp+":4161")
    c.Set("concurrency", runtime.GOMAXPROCS(runtime.NumCPU()))
    c.Set("max_attempts", 10)
    c.Set("max_in_flight", 150)
    c.Set("default_requeue_delay", "15s")
    c.Start(nsq.HandlerFunc(func(msg *nsq.Message) error {
        atomic.AddUint64(&ops, 1)
        if ops == uint64(*numbPtr) {
            elapsed := time.Since(start)
            log.Printf("Time took %s", elapsed)
        }
        //log.Println(string(msg.Body))
        return nil
    }))
    fmt.Scanln()
}
Advertisements

Subjectivity aside, leave a reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s