Golang中如何在大二进制文件中查找[]byte的偏移量

Golang中如何在大二进制文件中查找[]byte的偏移量 如何在大型二进制文件中查找 []byte 的偏移量?

假设我有一个 5 MB 的文件,其中某处包含 “hello”。

如何找到文件中 “hello” 起始位置的偏移量(以字节数计)?

我正在寻找类似这样的方法(伪代码):

f = os.Open("file.bin")
defer f.Close()
search = []byte{"hello"}
offset, err = scanFile(f, search)

// 如果 "hello" 在该偏移量处,offset 现在应包含例如 1234567890

在 IRC 上,有人建议使用 bufio.Reader,但它只能扫描单个字节,而不能扫描 []byte。


更多关于Golang中如何在大二进制文件中查找[]byte的偏移量的实战教程也可以访问 https://www.itying.com/category-94-b0.html

9 回复

你的算法不正确。假设行结束标记是 \n,这是扫描器的默认设置。如果 stopWord 包含一个 \n,它可能跨越两行扫描内容。你的算法将无法找到它。

更多关于Golang中如何在大二进制文件中查找[]byte的偏移量的实战系列教程也可以访问 https://www.itying.com/category-94-b0.html


为什么不将整个文件读入内存并使用 bytes.Index() 呢?5 MB 的数据完全可以轻松放入内存。

func scanFile(f io.ReadSeeker, search []byte) (int64, error){
    data, err := ioutil.ReadAll(f)
    if err != nil {
        return 0, err
    }
    return bytes.Index(data, search), nil
}

5 MB 可以轻松地存放在内存中

虽然这在大多数情况下是正确的,但仍然存在争议。今天是 5MB,明天可能就是 5GB,届时我们将需要新的算法。我想这取决于主题发起者来决定对他来说哪种情况更可能发生——是需要解析 5GB 的文件,还是需要查找包含换行符的单词 :slight_smile:

以下是实现方法之一:

func ScanFile(fd io.Reader, stopWord []byte) int {
	scanner := bufio.NewScanner(fd)
	bytesRead := 0

	for scanner.Scan() {
		currentSlice := scanner.Bytes()
		currentIndex := bytes.Index(currentSlice, stopWord)
		if currentIndex >= 0 {
			return bytesRead + currentIndex
		}
		bytesRead += len(currentSlice) + 1 // account for EOL
	}
	return -1
}

你的算法不正确。

它无法在字符串 “aaab” 中找到字符串 “aab”。这是因为一旦你找到了前两个匹配的 “aa”,你在位置 2 处遇到了不匹配。然后你从位置 2 重新开始搜索,但实际上你应该从位置 1 重新开始搜索。

你遗漏了一个回溯步骤。

另一个问题是搜索位置。bufio 读取器会预读数据。所以 f 中的位置与我们刚刚读取的最后一个匹配字符的位置并不相同。

func main() {
    fmt.Println("hello world")
}

这似乎能解决问题。感谢 IRC 上的 @bpalmer

// ScanFile returns the offset of the first occurence of a []byte in a file,
// or -1 if []byte was not found in file, and seeks to the beginning of the searched []byte
func ScanFile(f io.ReadSeeker, search []byte) int64 {
	ix := 0
	r := bufio.NewReader(f)
	offset := int64(0)
	for ix < len(search) {
		b, err := r.ReadByte()
		if err != nil {
			return -1
		}
		if search[ix] == b {
			ix++
		} else {
			ix = 0
		}
		offset++
	}
	f.Seek(offset - int64(len(search)), 0 ) // Seeks to the beginning of the searched []byte
	return offset - int64(len(search))
}

Playground: https://play.golang.org/p/6nwRjbDCeng

这是一个使用简单 io.Reader 的代码版本。我们在每次读取时,将 len(search)-1 个字符作为尾部复制到缓冲区的前面。这确保了搜索可以跨越数据块的边界。

const chunkSize = 4096

func find(r io.Reader, search []byte) (int64, error) {
    var offset int64
    tailLen := len(search)-1
    chunk := make([]byte, chunkSize+tailLen)
    n, err := r.Read(chunk[taiLen:])
    idx := bytes.Index(chunk[tailLen:n+tailLen], search)
    for {
        if idx >= 0 {
            return offset + int64(idx), nil
        }
        if err == io.EOF {
            return -1, nil
        } else if err != nil {
            return -1, err
        }
        copy(chunk, chunk[chunkSize:])
        offset += chunkSize
        n, err = r.Read(chunk[tailLen:])
        idx = bytes.Index(chunk[:n+tailLen], search)
    }
}

我建议分块读取数据并逐块检查。请确保块与模式长度有重叠,以捕获跨越块边界的匹配实例。或许可以像下面这样:

const chunkSize = 4096

func find(r io.ReaderAt, search []byte) (int64, error) {
    var offset int64
	chunk := make([]byte, chunkSize+len(search))
	for {
		n, err := r.ReadAt(chunk, offset)
		idx := bytes.Index(chunk[:n], search)
		if idx >= 0 {
			return offset + int64(idx), nil
		}

		if err == io.EOF {
			break
		} else if err != nil {
			return -1, err
		}

		offset += chunkSize
	}
	return -1, errors.New("not found")
}

如果不进行重叠读取,I/O 模式会更简洁,但那样在搜索时就需要更巧妙的处理。像现在这样,我们可以直接依赖(已经正确且高效的)bytes.Index 函数,正如之前建议的。在证明需要更复杂的方案之前,简单优于巧妙。

在大型二进制文件中查找特定字节序列的偏移量,可以使用以下方法:

package main

import (
    "bytes"
    "fmt"
    "io"
    "os"
)

func findOffset(file *os.File, search []byte) (int64, error) {
    const bufferSize = 32 * 1024 // 32KB缓冲区
    buffer := make([]byte, bufferSize)
    overlap := len(search) - 1
    var offset int64 = 0
    
    for {
        n, err := file.Read(buffer)
        if err != nil && err != io.EOF {
            return -1, err
        }
        
        // 在当前缓冲区中搜索
        if idx := bytes.Index(buffer[:n], search); idx != -1 {
            return offset + int64(idx), nil
        }
        
        // 处理缓冲区边界情况
        if n > overlap {
            // 保存最后overlap个字节用于下一次搜索
            copy(buffer[:overlap], buffer[n-overlap:n])
            offset += int64(n - overlap)
            // 将文件指针移回以重新读取重叠部分
            _, err = file.Seek(offset, io.SeekStart)
            if err != nil {
                return -1, err
            }
        }
        
        if err == io.EOF {
            break
        }
    }
    
    return -1, nil // 未找到
}

func main() {
    file, err := os.Open("file.bin")
    if err != nil {
        panic(err)
    }
    defer file.Close()
    
    search := []byte("hello")
    offset, err := findOffset(file, search)
    if err != nil {
        panic(err)
    }
    
    if offset >= 0 {
        fmt.Printf("Found at offset: %d\n", offset)
    } else {
        fmt.Println("Not found")
    }
}

更高效的实现使用bufio.Reader配合自定义搜索:

package main

import (
    "bufio"
    "bytes"
    "fmt"
    "io"
    "os"
)

func findOffsetWithReader(file *os.File, search []byte) (int64, error) {
    reader := bufio.NewReader(file)
    searchLen := len(search)
    var offset int64 = 0
    window := make([]byte, searchLen)
    
    // 初始化窗口
    n, err := io.ReadFull(reader, window)
    if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
        return -1, err
    }
    
    for {
        // 检查当前窗口是否匹配
        if bytes.Equal(window[:n], search) {
            return offset, nil
        }
        
        // 读取下一个字节
        nextByte, err := reader.ReadByte()
        if err != nil {
            if err == io.EOF {
                break
            }
            return -1, err
        }
        
        // 滑动窗口
        if searchLen > 1 {
            copy(window, window[1:])
            window[searchLen-1] = nextByte
        } else {
            window[0] = nextByte
        }
        offset++
    }
    
    return -1, nil
}

func main() {
    file, err := os.Open("file.bin")
    if err != nil {
        panic(err)
    }
    defer file.Close()
    
    search := []byte("hello")
    offset, err := findOffsetWithReader(file, search)
    if err != nil {
        panic(err)
    }
    
    if offset >= 0 {
        fmt.Printf("Found 'hello' at offset: %d\n", offset)
    } else {
        fmt.Println("Pattern not found")
    }
}

使用io.SectionReader进行内存映射式搜索:

package main

import (
    "bytes"
    "fmt"
    "io"
    "os"
)

func findOffsetMemoryEfficient(file *os.File, search []byte) (int64, error) {
    fileInfo, err := file.Stat()
    if err != nil {
        return -1, err
    }
    fileSize := fileInfo.Size()
    
    const chunkSize = 1 * 1024 * 1024 // 1MB chunks
    searchLen := len(search)
    
    for start := int64(0); start < fileSize; start += chunkSize {
        end := start + chunkSize + int64(searchLen) - 1
        if end > fileSize {
            end = fileSize
        }
        
        section := io.NewSectionReader(file, start, end-start)
        chunk := make([]byte, end-start)
        
        n, err := section.Read(chunk)
        if err != nil && err != io.EOF {
            return -1, err
        }
        
        if idx := bytes.Index(chunk[:n], search); idx != -1 {
            return start + int64(idx), nil
        }
        
        if end == fileSize {
            break
        }
    }
    
    return -1, nil
}

func main() {
    file, err := os.Open("file.bin")
    if err != nil {
        panic(err)
    }
    defer file.Close()
    
    offset, err := findOffsetMemoryEfficient(file, []byte("hello"))
    if err != nil {
        panic(err)
    }
    
    if offset >= 0 {
        fmt.Printf("Offset: %d (0x%x)\n", offset, offset)
    }
}

这些方法都能在5MB文件中高效查找字节序列的偏移量。第一种方法使用固定缓冲区,第二种使用滑动窗口,第三种使用分块读取。根据具体需求选择合适的方法。

回到顶部