纯真 IP 数据库解析 Node.js 版

发布于 1周前 作者 yuanlaile 来自 nodejs/Nestjs

用 Node.js 写了一个解析纯真 IP 数据库的项目:纯真 IP 数据库解析 Nest.js 版

支持以下特性:

  1. 提供国家、省、市、区、运营商等字段的解析;
  2. 通过定时任务定时更新 IP 数据库;
  3. IP 搜索、批量搜索;
  4. 查询 IP 数据库版本信息。

其中,IP 数据库来源:qqwry.dat

如果是数据库方式,还可以自定义各种模糊搜索、按字段搜索等(性能是个问题)。

该项目已在 ifuyun.com 上使用。

最后,感谢算法提供:qqwry.dat-analyse


纯真 IP 数据库解析 Node.js 版

9 回复

不错,待会我整个 Python 版的。


不错,待会我整个 Rust 版的。

Python 版我写好了
<br># -*- coding: UTF-8 -*-<br><br>import socket<br>import struct<br><br><br>class CzIp:<br><br> def __init__(self, db_file="qqwry2024-08-28.dat"): # db_file="qqwry.dat"<br> self.f_db = open(db_file, "rb")<br> bs = self.f_db.read(8)<br> (self.first_index, self.last_index) = struct.unpack("II", bs)<br> self.index_count = int((self.last_index - self.first_index) / 7 + 1)<br> self.cur_start_ip = None<br> self.cur_end_ip_offset = None<br> self.cur_end_ip = None<br> print(self.get_version(), " 记录总数: %d 条 " % (self.index_count))<br><br> def get_version(self):<br> """<br> 获取版本信息,最后一条 IP 记录 255.255.255.0-255.255.255.255 是版本信息<br> :return: str<br> """<br> s = self.get_addr_by_ip(0xFFFFFF00)<br> return s<br><br> def _get_area_addr(self, offset=0):<br> if offset:<br> self.f_db.seek(offset)<br> bs = self.f_db.read(1)<br> (byte,) = struct.unpack("B", bs)<br> if byte == 0x01 or byte == 0x02:<br> p = self.getLong3()<br> if p:<br> return self.get_offset_string(p)<br> else:<br> return ""<br> else:<br> self.f_db.seek(-1, 1)<br> return self.get_offset_string(offset)<br><br> def _get_addr(self, offset):<br> """<br> 获取 offset 处记录区地址信息(包含国家和地区)<br> 如果是中国 ip ,则是 "xx 省 xx 市 xxxxx 地区" 这样的形式<br> (比如:"福建省 电信", "澳大利亚 墨尔本 Goldenit 有限公司")<br> :param offset:<br> :return:str<br> """<br> self.f_db.seek(offset + 4)<br> bs = self.f_db.read(1)<br> (byte,) = struct.unpack("B", bs)<br> if byte == 0x01: # 重定向模式 1<br> country_offset = self.getLong3()<br> self.f_db.seek(country_offset)<br> bs = self.f_db.read(1)<br> (b,) = struct.unpack("B", bs)<br> if b == 0x02:<br> country_addr = self.get_offset_string(self.getLong3())<br> self.f_db.seek(country_offset + 4)<br> else:<br> country_addr = self.get_offset_string(country_offset)<br> area_addr = self._get_area_addr()<br> elif byte == 0x02: # 重定向模式 2<br> country_addr = self.get_offset_string(self.getLong3())<br> area_addr = self._get_area_addr(offset + 8)<br> else: # 字符串模式<br> country_addr = self.get_offset_string(offset + 4)<br> area_addr = self._get_area_addr()<br> return country_addr + " " + area_addr<br><br> def dump(self, first, last):<br> """<br> 打印数据库中索引为 first 到索引为 last(不包含 last)的记录<br> :param first:<br> :param last:<br> :return:<br> """<br> if last &gt; self.index_count:<br> last = self.index_count<br> for index in range(first, last):<br> offset = self.first_index + index * 7<br> self.f_db.seek(offset)<br> buf = self.f_db.read(7)<br> (ip, of1, of2) = struct.unpack("IHB", buf)<br> address = self._get_addr(of1 + (of2 &lt;&lt; 16))<br> print("%d %s %s" % (index, self.ip2str(ip), address))<br><br> def _set_ip_range(self, index):<br> offset = self.first_index + index * 7<br> self.f_db.seek(offset)<br> buf = self.f_db.read(7)<br> (self.cur_start_ip, of1, of2) = struct.unpack("IHB", buf)<br> self.cur_end_ip_offset = of1 + (of2 &lt;&lt; 16)<br> self.f_db.seek(self.cur_end_ip_offset)<br> buf = self.f_db.read(4)<br> (self.cur_end_ip,) = struct.unpack("I", buf)<br><br> def get_addr_by_ip(self, ip):<br> """<br> 通过 ip 查找其地址<br> :param ip: (int or str)<br> :return: str<br> """<br> if type(ip) == str:<br> ip = self.str2ip(ip)<br> L = 0<br> R = self.index_count - 1<br> while L &lt; R - 1:<br> M = int((L + R) / 2)<br> self._set_ip_range(M)<br> if ip == self.cur_start_ip:<br> L = M<br> break<br> if ip &gt; self.cur_start_ip:<br> L = M<br> else:<br> R = M<br> self._set_ip_range(L)<br> # version information, 255.255.255.X, urgy but useful<br> if ip &amp; 0xFFFFFF00 == 0xFFFFFF00:<br> self._set_ip_range(R)<br> if self.cur_start_ip &lt;= ip &lt;= self.cur_end_ip:<br> address = self._get_addr(self.cur_end_ip_offset)<br> else:<br> address = "未找到该 IP 的地址"<br> return address<br><br> def get_ip_range(self, ip):<br> """<br> 返回 ip 所在记录的 IP 段<br> :param ip: ip(str or int)<br> :return: str<br> """<br> if type(ip) == str:<br> ip = self.str2ip(ip)<br> self.get_addr_by_ip(ip)<br> range = self.ip2str(self.cur_start_ip) + " - " + self.ip2str(self.cur_end_ip)<br> return range<br><br> def get_offset_string(self, offset=0):<br> """<br> 获取文件偏移处的字符串(以'\0'结尾)<br> :param offset: 偏移<br> :return: str<br> """<br> if offset:<br> self.f_db.seek(offset)<br> bs = b""<br> ch = self.f_db.read(1)<br> (byte,) = struct.unpack("B", ch)<br> while byte != 0:<br> bs += ch<br> ch = self.f_db.read(1)<br> (byte,) = struct.unpack("B", ch)<br> return bs.decode("gbk")<br><br> def ip2str(self, ip):<br> """<br> 整数 IP 转化为 IP 字符串<br> :param ip:<br> :return:<br> """<br> return str(ip &gt;&gt; 24) + "." + str((ip &gt;&gt; 16) &amp; 0xFF) + "." + str((ip &gt;&gt; 8) &amp; 0xFF) + "." + str(ip &amp; 0xFF)<br><br> def str2ip(self, s):<br> """<br> IP 字符串转换为整数 IP<br> :param s:<br> :return:<br> """<br> (ip,) = struct.unpack("I", socket.inet_aton(s))<br> return ((ip &gt;&gt; 24) &amp; 0xFF) | ((ip &amp; 0xFF) &lt;&lt; 24) | ((ip &gt;&gt; 8) &amp; 0xFF00) | ((ip &amp; 0xFF00) &lt;&lt; 8)<br><br> def getLong3(self, offset=0):<br> """<br> 3 字节的数值<br> :param offset:<br> :return:<br> """<br> if offset:<br> self.f_db.seek(offset)<br> bs = self.f_db.read(3)<br> (a, b) = struct.unpack("HB", bs)<br> return (b &lt;&lt; 16) + a<br><br><br>if __name__ == "__main__":<br> # todo:纯真 IP 库解析<br> cz = CzIp()<br> # print(cz.get_version())<br> ip = "8.8.8.8"<br> print(cz.get_ip_range(ip))<br> print(cz.get_addr_by_ip(ip))<br> print("====")<br> ip = "125.129.173.203"<br> print(cz.get_ip_range(ip))<br> print(cz.get_addr_by_ip(ip))<br>

真是机灵鬼!

不错,待会我整个 PHP 版的。

马上停止更新了还做这个干嘛

Nest.js 搞一大堆 module 的项目我都感觉特别特别蠢。还得处理 module 之间的依赖关系特别麻烦。我所在的公司的 Nest.js 的项目都是一个 module 底下挂一大堆 controller 和 service 。
另外,其实这东西做成那种可配置的 module 也是挺不错的。

这种公共的、基础的服务本来就是可以独立成类似 Nest.js 官方的一些模块。这个项目为了能够独立运行,才做成了一个完整的 APP 。后续条件允许,再考虑模块化成 npm 包。
另外,就 module 而言,见仁见智吧,我不喜欢大杂烩,分工本来也有这一层意思。或许更好的是,像 Angular 那样,再抽象出一个 Standalone 层面的,也就无需 import 整个 module 。

纯真 IP 数据库(CZ88 IP 数据库)是一个常见的免费 IP 地理位置数据库,可以通过 Node.js 来解析和查询该数据库。以下是一个简单的 Node.js 示例,展示如何读取并解析纯真 IP 数据库(假设数据库文件为 qqwry.dat)。

首先,你需要一个解析纯真数据库的库,比如 qqwry-node。可以通过 npm 安装:

npm install qqwry-node

然后,可以使用以下代码来读取并查询 IP 地址:

const fs = require('fs');
const path = require('path');
const qqwry = require('qqwry-node');

// 读取数据库文件
const filePath = path.join(__dirname, 'qqwry.dat');
const data = fs.readFileSync(filePath);

// 初始化解析器
const parser = new qqwry.Parser(data);

// 查询 IP 地址
const ip = '8.8.8.8';
const result = parser.lookup(ip);

if (result) {
    console.log(`IP: ${ip}`);
    console.log(`Country: ${result.country}`);
    console.log(`Area: ${result.area}`);
} else {
    console.log(`IP ${ip} not found in the database.`);
}

在这段代码中,我们首先读取了纯真 IP 数据库文件 qqwry.dat,然后初始化了一个解析器对象。接下来,我们使用 lookup 方法查询特定的 IP 地址,并打印出查询结果中的国家和地区信息。

请注意,纯真 IP 数据库需要定期更新以保持数据的准确性,因此在实际应用中,你需要定期下载最新的数据库文件并替换旧的数据库文件。

回到顶部