provides a native redis server in front of redis+sentinel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

182 lines
4.7 KiB

  1. package main
  2. import (
  3. "errors"
  4. "flag"
  5. "fmt"
  6. "io"
  7. "log"
  8. "net"
  9. "strings"
  10. "sync"
  11. "time"
  12. )
  13. var (
  14. saddr *net.TCPAddr // Address of the sentinel service
  15. slock *sync.Mutex // Guard for above var
  16. masterAddr *net.TCPAddr // Address of the redis master
  17. mlock *sync.Mutex // Guard for above var
  18. localAddr = flag.String("listen", ":9999", "local address")
  19. sentinelAddr = flag.String("sentinel", ":26379", "remote address")
  20. masterName = flag.String("master", "", "name of the master redis node")
  21. )
  22. func main() {
  23. flag.Parse()
  24. slock = &sync.Mutex{}
  25. mlock = &sync.Mutex{}
  26. laddr, err := net.ResolveTCPAddr("tcp", *localAddr)
  27. if err != nil {
  28. log.Fatalf("Failed to resolve local address: %s", err.Error())
  29. }
  30. resolveSentinel(*sentinelAddr)
  31. // If sentinel's address is set to nil, this goroutine will resolve it and set the var again
  32. go sentinelUpdater(*sentinelAddr)
  33. // Continuously query sentinel for the master address, updating masterAddr when needed
  34. go master()
  35. listener, err := net.ListenTCP("tcp", laddr)
  36. if err != nil {
  37. log.Fatal(err)
  38. }
  39. for {
  40. conn, err := listener.AcceptTCP()
  41. if err != nil {
  42. log.Println(err)
  43. continue
  44. }
  45. go proxy(conn, masterAddr)
  46. }
  47. }
  48. func sentinelUpdater(sentinelAddr string) {
  49. // Resolve the address of sentinel when needed
  50. for {
  51. if saddr == nil {
  52. log.Print("Resolving sentinel address")
  53. resolveSentinel(sentinelAddr)
  54. }
  55. time.Sleep(1 * time.Second)
  56. }
  57. }
  58. func resolveSentinel(sentinelAddr string) {
  59. // var err error
  60. addr, err := net.ResolveTCPAddr("tcp", sentinelAddr)
  61. if err != nil {
  62. log.Printf("Failed to resolve sentinel address: %s", err.Error())
  63. return
  64. }
  65. slock.Lock()
  66. saddr = addr
  67. slock.Unlock()
  68. // TODO other cases when saddr isn't valid
  69. }
  70. func master() {
  71. var err error
  72. var tempAddr *net.TCPAddr
  73. for {
  74. tempAddr, err = getMasterAddr()
  75. if err != nil {
  76. log.Printf("Failed to get master addres: %s", err.Error())
  77. } else {
  78. mlock.Lock()
  79. masterAddr = tempAddr
  80. mlock.Unlock()
  81. }
  82. time.Sleep(1 * time.Second)
  83. }
  84. }
  85. func pipe(r io.Reader, w io.WriteCloser) {
  86. io.Copy(w, r)
  87. w.Close()
  88. }
  89. func proxy(local io.ReadWriteCloser, remoteAddr *net.TCPAddr) {
  90. remote, err := net.DialTCP("tcp", nil, remoteAddr)
  91. if err != nil {
  92. log.Println(err)
  93. local.Close()
  94. return
  95. }
  96. go pipe(local, remote)
  97. go pipe(remote, local)
  98. }
  99. // Connect to Sentinel and query it to find the redis master
  100. func getMasterAddr() (*net.TCPAddr, error) {
  101. // Connect to sentinel
  102. // If the connection times out, that master is probably gone.
  103. // Mark saddr as nil so that the resolver thread will update it later.
  104. // Create a local copy of the sentinel address, it can change under our feet
  105. slock.Lock()
  106. if saddr == nil {
  107. defer slock.Unlock()
  108. return nil, errors.New("Sentinel address not available")
  109. }
  110. local_saddr := *saddr
  111. slock.Unlock()
  112. sentConn, err := dialTimeout(&local_saddr, 5*time.Second)
  113. if err != nil {
  114. log.Printf("Connecting to sentinel master timed out/failed: %s\n", err.Error())
  115. slock.Lock()
  116. saddr = nil
  117. slock.Unlock()
  118. return nil, err
  119. }
  120. defer sentConn.Close()
  121. // We connected to the master, ask for the redis master
  122. sentConn.Write([]byte(fmt.Sprintf("sentinel get-master-addr-by-name %s\n", *masterName)))
  123. b := make([]byte, 256)
  124. _, err = sentConn.Read(b)
  125. if err != nil {
  126. return nil, err
  127. }
  128. parts := strings.Split(string(b), "\r\n")
  129. if len(parts) < 5 {
  130. err = errors.New("Couldn't get master address from sentinel")
  131. return nil, err
  132. }
  133. // Parse the address for the master node
  134. stringaddr := fmt.Sprintf("%s:%s", parts[2], parts[4])
  135. addr, err := net.ResolveTCPAddr("tcp", stringaddr)
  136. if err != nil {
  137. return nil, err
  138. }
  139. // Verify the returned address is actually listening
  140. // TODO is this really needed?
  141. conn2, err := dialTimeout(addr, 5*time.Second)
  142. if err != nil {
  143. return nil, err
  144. }
  145. defer conn2.Close()
  146. return addr, err
  147. }
  148. // Connect to a TCPAddr, failing if a timeout is exceeded or other error encountered
  149. func dialTimeout(destAddr *net.TCPAddr, timeout time.Duration) (*net.TCPConn, error) {
  150. d := net.Dialer{Timeout: timeout}
  151. netcon, err := d.Dial("tcp", fmt.Sprintf("%s:%d", destAddr.IP, destAddr.Port))
  152. if err != nil {
  153. return nil, err
  154. }
  155. conn, _ := netcon.(*net.TCPConn)
  156. return conn, nil
  157. }