package main import ( "errors" "flag" "fmt" "io" "log" "net" "strings" "sync" "time" ) var ( saddr *net.TCPAddr // Address of the sentinel service slock *sync.Mutex // Guard for above var masterAddr *net.TCPAddr // Address of the redis master mlock *sync.Mutex // Guard for above var localAddr = flag.String("listen", ":9999", "local address") sentinelAddr = flag.String("sentinel", ":26379", "remote address") masterName = flag.String("master", "", "name of the master redis node") ) func main() { flag.Parse() slock = &sync.Mutex{} mlock = &sync.Mutex{} laddr, err := net.ResolveTCPAddr("tcp", *localAddr) if err != nil { log.Fatalf("Failed to resolve local address: %s", err.Error()) } resolveSentinel(*sentinelAddr) // If sentinel's address is set to nil, this goroutine will resolve it and set the var again go sentinelUpdater(*sentinelAddr) // Continuously query sentinel for the master address, updating masterAddr when needed go master() listener, err := net.ListenTCP("tcp", laddr) if err != nil { log.Fatal(err) } for { conn, err := listener.AcceptTCP() if err != nil { log.Println(err) continue } go proxy(conn, masterAddr) } } func sentinelUpdater(sentinelAddr string) { // Resolve the address of sentinel when needed for { if saddr == nil { log.Print("Resolving sentinel address") resolveSentinel(sentinelAddr) } time.Sleep(1 * time.Second) } } func resolveSentinel(sentinelAddr string) { // var err error addr, err := net.ResolveTCPAddr("tcp", sentinelAddr) if err != nil { log.Printf("Failed to resolve sentinel address: %s", err.Error()) return } slock.Lock() saddr = addr slock.Unlock() // TODO other cases when saddr isn't valid } func master() { var err error var tempAddr *net.TCPAddr for { tempAddr, err = getMasterAddr() if err != nil { log.Printf("Failed to get master addres: %s", err.Error()) } else { mlock.Lock() masterAddr = tempAddr mlock.Unlock() } time.Sleep(1 * time.Second) } } func pipe(r io.Reader, w io.WriteCloser) { io.Copy(w, r) w.Close() } func proxy(local io.ReadWriteCloser, remoteAddr *net.TCPAddr) { remote, err := net.DialTCP("tcp", nil, remoteAddr) if err != nil { log.Println(err) local.Close() return } go pipe(local, remote) go pipe(remote, local) } // Connect to Sentinel and query it to find the redis master func getMasterAddr() (*net.TCPAddr, error) { // Connect to sentinel // If the connection times out, that master is probably gone. // Mark saddr as nil so that the resolver thread will update it later. // Create a local copy of the sentinel address, it can change under our feet slock.Lock() if saddr == nil { defer slock.Unlock() return nil, errors.New("Sentinel address not available") } local_saddr := *saddr slock.Unlock() sentConn, err := dialTimeout(&local_saddr, 5*time.Second) if err != nil { log.Printf("Connecting to sentinel master timed out/failed: %s\n", err.Error()) slock.Lock() saddr = nil slock.Unlock() return nil, err } defer sentConn.Close() // We connected to the master, ask for the redis master sentConn.Write([]byte(fmt.Sprintf("sentinel get-master-addr-by-name %s\n", *masterName))) b := make([]byte, 256) _, err = sentConn.Read(b) if err != nil { return nil, err } parts := strings.Split(string(b), "\r\n") if len(parts) < 5 { err = errors.New("Couldn't get master address from sentinel") return nil, err } // Parse the address for the master node stringaddr := fmt.Sprintf("%s:%s", parts[2], parts[4]) addr, err := net.ResolveTCPAddr("tcp", stringaddr) if err != nil { return nil, err } // Verify the returned address is actually listening // TODO is this really needed? conn2, err := dialTimeout(addr, 5*time.Second) if err != nil { return nil, err } defer conn2.Close() return addr, err } // Connect to a TCPAddr, failing if a timeout is exceeded or other error encountered func dialTimeout(destAddr *net.TCPAddr, timeout time.Duration) (*net.TCPConn, error) { d := net.Dialer{Timeout: timeout} netcon, err := d.Dial("tcp", fmt.Sprintf("%s:%d", destAddr.IP, destAddr.Port)) if err != nil { return nil, err } conn, _ := netcon.(*net.TCPConn) return conn, nil }