Golang操作MySQL复制输出的实践指南

Golang操作MySQL复制输出的实践指南 大家好,

我是一名MySQL数据库管理员,正在测试使用Golang来处理一些MySQL管理任务。

目前,我正在尝试处理一个名为“SHOW SLAVE STATUS”的MySQL命令的输出。

以下是从Linux/MySQL命令行看到的输出示例:

mysql> show slave status\G
*************************** 1. row ***************************
               Slave_IO_State: Waiting for master to send event
                  Master_Host: server.hostname.com
                  Master_User: replUser
                  Master_Port: 3306
                Connect_Retry: 10
              Master_Log_File: mysql-bin.000001
          Read_Master_Log_Pos: 999999999
               Relay_Log_File: mysql-relay-bin.000001
                Relay_Log_Pos: 999999999
        Relay_Master_Log_File: mysql-bin.000001
             Slave_IO_Running: Yes
            Slave_SQL_Running: Yes
                   Last_Errno: 0
                   Last_Error: 
                 Skip_Counter: 0
          Exec_Master_Log_Pos: 376515189
              Relay_Log_Space: 376515572
              Until_Condition: None
               Until_Log_File: 
                Until_Log_Pos: 0
           Master_SSL_Allowed: No
        Seconds_Behind_Master: 0
Master_SSL_Verify_Server_Cert: No
                Last_IO_Errno: 0
                Last_IO_Error: 
               Last_SQL_Errno: 0
               Last_SQL_Error: 
             Master_Server_Id: 340472056
                  Master_UUID: a75ae239-4444-11e6-4444-1402ec6b1540
             Master_Info_File: /mnt/server/foldername/mysql/master.info
          SQL_Remaining_Delay: NULL
      Slave_SQL_Running_State: Slave has read all relay log; waiting for the slave I/O thread to update it
           Master_Retry_Count: 86400
      Last_IO_Error_Timestamp: 
     Last_SQL_Error_Timestamp: 
1 row in set (0.00 sec)

如您所见,输出内容非常详细,包含许多“列”。

这是我用来运行“SHOW SLAVE STATUS”命令的代码。

package main

import (
	"database/sql"
	"fmt"
	"log"

	_ "github.com/go-sql-driver/mysql"
)

func main() {
	db, err := sql.Open("mysql",
		"$USER:$PASS@tcp($HOSTNAME:3306)/Testing")
	if err != nil {
		log.Println(err)
	}
	var str string
	err = db.QueryRow("SHOW SLAVE STATUS").Scan(&str)
	if err != nil && err != sql.ErrNoRows {
		log.Println(err)
	}
	fmt.Println(str)
	defer db.Close()
}

我希望将此输出保存到某个变量或对象中,以便后续从中选取所需信息。

以下是一些(不正确的)伪代码,用于解释我(认为)想要实现的目标。

//Fake golang code to explain what I'm trying to do
//Get slave status
slaveStatus := getSlaveStatus()

//Check if MySQL slave is running
patternToFind := regexp.Find('Slave_IO_Running: Yes', slaveStatus)

//Evaluate if MySQL replication is running
if regexp.MatchString("Slave_IO_Running: Yes", patternToFind) {
     fmt.Println("Slave is running") 
     } else {
     fmt.Println("Slave is not running") }

我有以下问题。

1.) 是否有函数或方法可以了解Golang是如何解释“SHOW SLAVE STATUS”数据的输出的?

它接收到的是映射、切片、字符串还是二进制大对象?我该如何了解Golang如何看待这个输出?

2.) 不同的MySQL服务器在“SHOW SLAVE STATUS”的输出中可能有不同的“列”。处理结果集列数不固定的最佳方式是什么?对于查询结果集中不断变化的列数,我应该如何处理?

====

任何建议、指导或意见都将不胜感激。谢谢!!


更多关于Golang操作MySQL复制输出的实践指南的实战教程也可以访问 https://www.itying.com/category-94-b0.html

6 回复

@thecartesian 我也遇到了同样的问题。如果你找到了解决方法,可以在这里贴出解决方案吗?

更多关于Golang操作MySQL复制输出的实践指南的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


以下是一些可能帮助您更好地理解这些内容的资源:

  1. http://go-database-sql.org/retrieving.html 这是关于结果检索的通用介绍
  2. http://go-database-sql.org/varcols.html 这是一种用于处理未知总列数的技术
  3. https://gist.github.com/SchumacherFM/69a167bec7dea644a20e 这是一段有趣的代码,用于将未知数量的列提取到一个字符串映射中。

你已经将切片中的每个元素都设为 *sql.RawBytes 类型,打印出来的是这些元素的地址。假设这些原始字节实际上代表一个字符串,你可以通过解引用和类型转换来打印它,例如:fmt.Println(string(*vals[0]))。但当然,这并不能保证总是如此。原始字节就是原始字节——它们是从 SQL 服务器返回的字节。通常来说,这是一个相当不方便处理的类型。

上面那篇关于“可变列”的文章含糊地提到了对结果使用“类型内省和类型断言”来获取实际类型,但我没看明白那具体是如何实现的。

我建议你弄清楚返回的类型,然后像那篇文章的第一个示例那样使用字符串、整数等类型。如果有必要,可以通过试错来完成。我相信 SHOW SLAVE STATUS 命令总是可以预期返回相同的列。


编辑: 你可以调用 rows.ColumnTypes() 来获取实际的类型。但这仍然相当繁琐。

请注意,你也可以直接在表中访问复制状态:https://dev.mysql.com/doc/refman/5.7/en/performance-schema-replication-tables.html

这让你可以对你需要的列进行更有控制的查询,并得到你期望的类型。

func main() {
    fmt.Println("hello world")
}

所以,我已经在下面代码块的底部完善了我的代码,但我遗漏了一些非常、非常基础的东西(对于我的新手水平,我提前表示歉意)。

如何将内存地址打印为字符串?

这段代码除了将值打印为字符串外,完全符合我的要求……我该如何让最后的 fmt.* 命令打印出字符串呢?

package main

import (
	"database/sql"
	"fmt"
	"log"

	_ "github.com/go-sql-driver/mysql"
)

func main() {
	//连接到数据库并检查错误
	db, err := sql.Open("mysql","username:password@tcp(127.0.0.1:3306)/Testing")
	if err != nil {
		log.Println(err)
	}
	//执行查询,检查错误,延迟关闭
	rows, err := db.Query("SELECT * FROM animals")
	if err != nil {
		log.Fatal(err)
	}
	defer rows.Close()

	//构建结果集??
	cols, err := rows.Columns() // 记得之后检查错误
	if err != nil {
		log.Fatal(err)
	}
  
  //构建接口并用值填充它
	vals := make([]interface{}, len(cols))
	for i, _ := range cols {
	vals[i] = new(sql.RawBytes)
}
  
  //对于'rows'中的每个值,将其扫描到接口中
	for rows.Next() {
	err = rows.Scan(vals...)
	// 现在你可以检查vals的每个元素是否为nil,
	if err != nil {
		log.Fatal(err)
	}
	//下面的Println命令打印值,但是
  	//它打印出来的是内存地址,像这样 => "[0xc42000c600 0xc42000c620]"
  	//↓↓↓↓我如何打印从MySQL检索到的字符串值???
	fmt.Println(vals) //<<<我如何防止这个家伙打印这些^^^^^内存地址??
	}

}

你好 @Tejas_Sangani

完全忘了这个帖子。

这里快速解释一下 database/sql 的工作原理,并提供一个编码解决方案,帮助你无论列类型和 MySQL 版本如何都能显示 SHOW SLAVE STATUS。

- 使用 Golang 查询 SLAVE STATUS -

当处理来自 MySQL 的 SLAVE STATUS 时,最典型的方法是使用 database/sql 中的 DB.Query() 方法。

db.Query() 返回一个 Rows 结构体。

Rows 有两个对你有用的方法:

  • Rows.Columns() - 提供列名
  • Rows.Scan() - 将 []Bytes 数据放入变量或数组

Golang 是一种“类型化”语言,因此你既需要为 SHOW SLAVE STATUS 中的每一列指定数据类型,也需要创建一个占位符(如 interface{})来满足 Go 编译器检查的类型化语法要求。

我发现动态获取列和行数据的最佳方法是分别提取列数据和行数据,然后将它们格式化在一起。

- 查询 MySQL 从库状态的示例代码 -

在这个例子中,我构建了一个 Rows 对象。分别提取列数据和行数据,然后将它们格式化打印在一起。

格式化后,你可以像我一样打印这些值,也可以对它们进行评估并采取其他相应的操作。

GITHUB 链接 以获得更好的代码高亮。

package main

import (
  "database/sql"
  "log"
  _ "github.com/go-sql-driver/mysql"
  "fmt"
  "strings"
)

func main() {

  //Open a connection to MySQL
  db, err := sql.Open("mysql", "username:password@tcp(hostname:mysql_port)/db_name")
  if err != nil {
    log.Fatal(err)
  }

  //No matter what happens, execute a db.Close() as the last thing we do
  defer db.Close()


  //Build sql.Rows object which contains our query data
  rows, err := db.Query("SHOW SLAVE STATUS")

  if err != nil {
    log.Fatal(err)
  }

  // sql.Rows has a function which returns all column names
  // as a slice of []string. Variable "columns" represents this
  columns, err := rows.Columns()
  if err != nil {
    log.Fatal(err)
  }

  // variable "values" is a pre-populated array of empty interfaces
  // We load an empty interface for every column 'sql.Rows' has.
  // The interfaces will allow us to call methods of any type that replaces it
  values := make([]interface{}, len(columns))

  // for every key we find while traversing array "columns"
  // set the corresponding interface in array "values" to be populated
  // with an empty sql.RawBytes type
  // sql.RawBytes is analogous to []byte
  for key, _ := range columns {
    values[key] = new(sql.RawBytes)
  }

  //Contrary to appearances, this is not a loop through every row
  // "rows.Next()"" is a recursive function that is called immediately
  // upon every row until we hit "rows.Next == false"
  // This is important because it means you must prepopulate variables or
  // arrays to the exact number of columns in the target SQL table
  // more details at: https://golang.org/pkg/database/sql/#Rows.Next
  for rows.Next() {
    //the "values..." tells Go to use every available slot to populate data
    err = rows.Scan(values...)
    if err != nil {
      log.Fatal(err)
    }
  }

  for index, columnName := range columns {
    // convert sql.RawBytes to String using "fmt.Sprintf"
    columnValue := fmt.Sprintf("%s", values[index])
    
    // Remove "&amp;" from row values
    columnValue = strings.Replace(columnValue, "&amp;", "", -1)
    
    // Optional: Don't display values that are NULL
    // Remove "if" to return empty NULL values
    if len(columnValue) == 0 {
      continue
    }

    // Print finished product
    fmt.Printf("%s: %s\n", columnName, columnValue)
  }
}

如果这个代码对任何人有用,请告诉我!

在Golang中处理SHOW SLAVE STATUS这类动态列数的查询,可以使用sql.Rows来遍历结果集。以下是具体实现:

package main

import (
    "database/sql"
    "fmt"
    "log"

    _ "github.com/go-sql-driver/mysql"
)

func main() {
    db, err := sql.Open("mysql", "$USER:$PASS@tcp($HOSTNAME:3306)/")
    if err != nil {
        log.Fatal(err)
    }
    defer db.Close()

    rows, err := db.Query("SHOW SLAVE STATUS")
    if err != nil {
        log.Fatal(err)
    }
    defer rows.Close()

    columns, err := rows.Columns()
    if err != nil {
        log.Fatal(err)
    }

    values := make([]sql.RawBytes, len(columns))
    scanArgs := make([]interface{}, len(values))
    for i := range values {
        scanArgs[i] = &values[i]
    }

    slaveStatus := make(map[string]string)
    
    for rows.Next() {
        err = rows.Scan(scanArgs...)
        if err != nil {
            log.Fatal(err)
        }

        for i, col := range values {
            if col == nil {
                slaveStatus[columns[i]] = "NULL"
            } else {
                slaveStatus[columns[i]] = string(col)
            }
        }
    }

    if err = rows.Err(); err != nil {
        log.Fatal(err)
    }

    // 访问特定字段
    if slaveStatus["Slave_IO_Running"] == "Yes" && slaveStatus["Slave_SQL_Running"] == "Yes" {
        fmt.Println("Slave is running")
    } else {
        fmt.Println("Slave is not running")
    }

    // 打印所有字段
    for key, value := range slaveStatus {
        fmt.Printf("%s: %s\n", key, value)
    }
}

对于动态列的处理,这里使用sql.RawBytesinterface{}切片来接收任意数量的列。rows.Columns()获取列名,然后通过rows.Scan()将每行数据扫描到values切片中。

检查复制状态的示例:

func checkReplicationStatus(status map[string]string) {
    ioRunning := status["Slave_IO_Running"]
    sqlRunning := status["Slave_SQL_Running"]
    secondsBehind := status["Seconds_Behind_Master"]

    fmt.Printf("IO Thread: %s\n", ioRunning)
    fmt.Printf("SQL Thread: %s\n", sqlRunning)
    fmt.Printf("Seconds Behind Master: %s\n", secondsBehind)

    if ioRunning == "Yes" && sqlRunning == "Yes" {
        if secondsBehind == "0" {
            fmt.Println("Replication is running and up to date")
        } else {
            fmt.Printf("Replication is running but lagging by %s seconds\n", secondsBehind)
        }
    } else {
        fmt.Println("Replication is not running properly")
        
        if status["Last_IO_Error"] != "" {
            fmt.Printf("Last IO Error: %s\n", status["Last_IO_Error"])
        }
        if status["Last_SQL_Error"] != "" {
            fmt.Printf("Last SQL Error: %s\n", status["Last_SQL_Error"])
        }
    }
}

处理多行结果的版本:

func getAllSlaveStatus(db *sql.DB) ([]map[string]string, error) {
    rows, err := db.Query("SHOW SLAVE STATUS")
    if err != nil {
        return nil, err
    }
    defer rows.Close()

    columns, err := rows.Columns()
    if err != nil {
        return nil, err
    }

    var results []map[string]string
    
    for rows.Next() {
        values := make([]sql.RawBytes, len(columns))
        scanArgs := make([]interface{}, len(values))
        for i := range values {
            scanArgs[i] = &values[i]
        }

        err = rows.Scan(scanArgs...)
        if err != nil {
            return nil, err
        }

        rowMap := make(map[string]string)
        for i, col := range values {
            if col == nil {
                rowMap[columns[i]] = "NULL"
            } else {
                rowMap[columns[i]] = string(col)
            }
        }
        results = append(results, rowMap)
    }

    return results, rows.Err()
}

这种方法通过map[string]string存储结果,可以处理任意数量和名称的列,适应不同MySQL版本的SHOW SLAVE STATUS输出差异。

回到顶部