Nodejs 请问怎么在主调方法内获取异步请求的回调函数的返回值?如下:

Nodejs 请问怎么在主调方法内获取异步请求的回调函数的返回值?如下:

var csvpath = “C:/Users/name/Desktop/address.csv”;
var requrl = ‘http://api.map.baidu.com/geocoder/v2/?ak=xxxxxxxxx&output=json&pois=1&location=’;
//需求:csv读取每行,异步请求获取返回结果,结果输出到result.csv
csv().from.path(csvpath, {header: true, columns: true})
.transform(function(row,index){
if(index < 2) return;
if(!row){
throw new Error(‘error on import row index #’+index+’ '+JSON.stringify(row));
}
var i = row.length;
var location = row.lat.trim() + ‘,’ + row.lng.trim();
//异步请求,在回调函数中处理结果返回,怎么在外面获取这个返回值???
//transform内部需要return一个结果,输出到result.csv中
//想实现请求时阻塞直到返回结果,并在transform中返回,请问怎么做呢?
//一直对异步和回调理解不够
request.get({url:requrl+location}, function (error, response, body) {
if(!error && response.statusCode == 200 && body.status == 0) {
var resString = ‘’;
body = JSON.stringify(body);
resString = index + “,” + body.location.lat + “,” + body.location.lng + “,”
+ body.formatted_address + “,” + body.business + “,” + body.addressComponent.city + “,”
+ body.addressComponent.district + “,” + body.addressComponent.province + “,”
+ body.addressComponent.street + “,” + body.addressComponent.street_number;
var pois = body.pois;
for(var i=0; i<pois.length; i++){
var poi = pois[i];
resString += “,” + poi.name + “,” + poi.poiType + “,” + poi.addr + “,”
+ poi.point.x + “,” + poi.point.y + “,” + poi.distance;
}
return resString;

        }else{
            console.log(index + "--- " + error + "---" + response.statusCode);
            //console.log(index + " error : " + body);
            return index + ", error ";
        }
    });
    console.log(res);
    return res;
})
.on('end', function(count){
    console.log('Number of lines: '+count);
})
.on('error', function(error){
    console.log(error);
    return next(error);
})
.to.path('./result.csv');


7 回复

在Node.js中处理异步操作,特别是涉及到I/O操作(如HTTP请求)时,通常使用回调函数、Promise或者async/await来管理。在这个例子中,我们可以通过使用async/await结合Promise来简化异步请求的处理,使得代码更加直观和易于理解。

解决方案

我们可以封装request.get为一个返回Promise的函数,然后在transform方法中使用async/await来等待异步请求的结果。

示例代码:

首先,我们需要安装request-promise-native库,这是一个基于request库的Promise封装。

npm install request-promise-native

然后,我们可以修改代码如下:

const fs = require('fs');
const path = require('path');
const csv = require('csv-parser');
const request = require('request-promise-native');

const csvPath = "C:/Users/name/Desktop/address.csv";
const resultPath = './result.csv';
const reqUrl = 'http://api.map.baidu.com/geocoder/v2/?ak=xxxxxxxxx&output=json&pois=1&location=';

async function processCsv() {
    const results = [];

    await new Promise((resolve, reject) => {
        fs.createReadStream(csvPath)
            .pipe(csv())
            .on('data', async (row) => {
                if (row.lat && row.lng) {
                    try {
                        const location = `${row.lat},${row.lng}`;
                        const response = await request.get({ url: reqUrl + location });
                        const body = JSON.parse(response);

                        if (!body.error && body.status === 0) {
                            const resString = [
                                row.lat,
                                row.lng,
                                body.result.location.lat,
                                body.result.location.lng,
                                body.result.formatted_address,
                                body.result.business,
                                body.result.addressComponent.city,
                                body.result.addressComponent.district,
                                body.result.addressComponent.province,
                                body.result.addressComponent.street,
                                body.result.addressComponent.street_number
                            ].join(',');

                            results.push(resString);
                        } else {
                            console.error(`Error processing ${location}:`, body);
                            results.push(`${row.lat},${row.lng},error`);
                        }
                    } catch (err) {
                        console.error(`Request error for ${location}:`, err);
                        results.push(`${row.lat},${row.lng},error`);
                    }
                }
            })
            .on('end', () => {
                resolve();
            })
            .on('error', (error) => {
                reject(error);
            });
    });

    // 将结果写入文件
    fs.writeFileSync(resultPath, results.join('\n'));
}

processCsv().then(() => {
    console.log('Processing completed.');
}).catch((error) => {
    console.error('An error occurred:', error);
});

解释

  1. 封装请求:我们使用request-promise-native来确保每个请求返回一个Promise
  2. 异步处理:在transform方法中,我们使用async/await来等待每个请求的结果。
  3. 结果收集:我们将每个请求的结果存储在一个数组results中。
  4. 写入文件:最后,我们将所有结果写入result.csv文件。

这种方法使得代码逻辑更加清晰,更容易理解和维护。


怎么没有人回答一下呢

求解决办法

帮你顶一个~~

用csv transform 异步模式可以不考虑上面那个问题了,下面是代码,但是下面代码另一个问题:如果在回调中获取改行对应的index,由于是异步,获取的index是错误的,不知如何解决,网大家指教

csv().from.path(csvpath, {header: true, columns: true})
    .transform(function(row,index, callback){
        if(index < 2) return;
        if(!row){
            throw new Error('error on import row index #'+index+' '+JSON.stringify(row));
        }
        var location = row.lat.trim() + ',' + row.lng.trim();
        var row_index = index;
            request.get({url:requrl+location}, function (error, response, body) {
                var resString = '';
                if(!error && response.statusCode == 200) {
                    body = JSON.parse(body);
                    if(body.status == '0'){
                        var result = body.result;
                        resString = row_index + ',' + result.location.lat + ',' + result.location.lng + ',"'
                            + result.formatted_address + '","' + result.business + '",' + result.addressComponent.city + ','
                            + result.addressComponent.district + ',' + result.addressComponent.province + ',"'
                            + result.addressComponent.street + '",' + result.addressComponent.street_number;
                        var pois = result.pois;
                        for(var i=0; i<pois.length; i++){
                            var poi = pois[i];
                            resString = resString + ',poi_' + i + ',"' + poi.name + '","' + poi.poiType + '","' + poi.addr + '",'
                                + poi.point.x + ',' + poi.point.y + ',' + poi.distance;
                        }
                    }else{
                        resString =  row_index + ", error \n";
                    }
                }else{
                    console.log(row_index + "--- " + error + "---" + response.statusCode + '-----body--' + body);
                    resString =  row_index + ", error \n";
                }
                callback(null, resString + "\n");
            });
    })
    .on('end', function(count){
        console.log('Number of lines: '+count);
    })
    .on('error', function(error){
        console.log(error);
        return next(error);
    })
    .to.path('./result_04.csv');

//异步请求,在回调函数中处理结果返回,怎么在外面获取这个返回值??? 可以在回调函数中调用外部过程;可以触发一个事件,另有外部过程订阅该事件。

在Node.js中,异步操作(如网络请求)无法直接在回调函数外同步获取返回值。这是因为JavaScript的事件循环机制决定了异步代码不会阻塞主线程。为了解决这个问题,可以使用async/await语法来简化异步操作的处理。

以下是修改后的代码示例:

const fs = require('fs');
const path = require('path');
const csv = require('csv-parser');
const request = require('request');

const csvpath = "C:/Users/name/Desktop/address.csv";

async function processCSV() {
    const results = [];

    await new Promise((resolve, reject) => {
        fs.createReadStream(csvpath)
            .pipe(csv({ headers: true }))
            .on('data', (row) => {
                if (!row || row.length < 3) return; // 跳过不完整的行
                const location = `${row.lat.trim()},${row.lng.trim()}`;
                request.get({ url: `http://api.map.baidu.com/geocoder/v2/?ak=xxxxxxxxx&output=json&pois=1&location=${location}` }, (error, response, body) => {
                    if (!error && response.statusCode === 200) {
                        try {
                            const result = JSON.parse(body);
                            if (result.status === 0) {
                                results.push({
                                    index: row.index,
                                    lat: result.location.lat,
                                    lng: result.location.lng,
                                    address: result.formatted_address,
                                    city: result.addressComponent.city,
                                    district: result.addressComponent.district,
                                    province: result.addressComponent.province,
                                    street: result.addressComponent.street,
                                    street_number: result.addressComponent.street_number,
                                    pois: result.pois.map(poi => ({
                                        name: poi.name,
                                        poiType: poi.poiType,
                                        addr: poi.addr,
                                        point: `${poi.point.x},${poi.point.y}`,
                                        distance: poi.distance
                                    }))
                                });
                            }
                        } catch (e) {
                            console.error(`Error parsing response: ${body}`);
                        }
                    } else {
                        console.error(`Request error: ${error} - Status code: ${response.statusCode}`);
                    }
                });
            })
            .on('end', () => {
                resolve();
            })
            .on('error', (err) => {
                reject(err);
            });
    });

    // 将结果写入文件
    fs.writeFileSync('./result.csv', results.map(result => [
        result.index,
        result.lat,
        result.lng,
        result.address,
        result.city,
        result.district,
        result.province,
        result.street,
        result.street_number,
        ...result.pois.map(poi => [poi.name, poi.poiType, poi.addr, poi.point, poi.distance])
    ].join(',')));
}

processCSV();

解释:

  1. async 函数:将处理逻辑封装在一个异步函数中。
  2. await 关键字:用于等待异步操作完成。
  3. Promise:将传统的回调式API转换为支持await的Promise形式。
  4. fs 模块:用于读取和写入文件。
  5. request 模块:用于发送HTTP请求。

这样可以确保所有的异步请求都完成后再进行后续处理,最终将结果保存到result.csv文件中。

回到顶部