Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Output buffer #22

Open
thxmike opened this issue Sep 19, 2015 · 2 comments
Open

Output buffer #22

thxmike opened this issue Sep 19, 2015 · 2 comments

Comments

@thxmike
Copy link

thxmike commented Sep 19, 2015

So in the following example, I am trying to take pumpOne's output buffer and make it the input buffer (from()) for pumpTwo. I see the writes going into the output buffer, however,the data does not make it to the second pump. Also, I get a promise error but I am returning a promise from the same library you are using Perhaps you can clarify why:

var datapumps = require('datapumps');
var mssql = require('mssql');
Promise = require('bluebird');

var config = {
    user: 'abc123',
    password: 'abc123',
    server: 'server', // You can use 'localhost\\instance' to connect to named instance
    database: 'db',
    stream: true, // You can enable streaming globally
    requestTimeout: 60000,
    options: {
        encrypt: false // Use this if you're on Windows Azure
    }
}

var connection = new mssql.Connection(config, function (err) {

    if (err) {
        console.log(err);
        return;
    }

    var request = connection.request();

    request.query('select top 10 column from table where othercolumn = 0', function (err, recordset) {
        // ... error checks
        if (err) {
            console.log(err);
        }

        //Only executed if not streaming i.e streaming = false
        //console.log(recordset);

    });

    //All Event Emitters are used when streaming i.e streaming = true
    request.on('recordset', function (columns) {
        // Emitted once for each recordset in a query
    });

    request.on('row', function (row) {
        // Emitted for each row
        extract_input_buffer.writeAsync(row);

        console.log(extract_input_buffer.content.length);
    });

    request.on('error', function (err) {
        // May be emitted multiple times
    });

    request.on('done', function (returnValue) {
        // Always emitted as the last one
        connection.close();
        extract_input_buffer.seal();

    });

});

var extract_pump = new datapumps.Pump();

var extract_input_buffer = new datapumps.Buffer({size: 100});

var extract_output_buffer = extract_pump.buffer();

extract_pump.errorBuffer().on('write', function (data) {
    console.log(data);
});

extract_pump.buffer('output').on('write', function (data) {
    console.log(extract_output_buffer.content.length);
});

extract_pump.buffer('output').on('end', function (data) {
    console.log("output buffer has ended")
});

var transform_pump = new datapumps.Pump();

extract_pump
    .from(extract_input_buffer)
    .to(transform_pump, 'output')
    .logErrorsToConsole()
    .process(function (data) {
        console.log(extract_input_buffer.content.length);
        return new Promise(function (resolve, reject) {
            resolve(data);
        });
    })
    .start()
    .whenFinished(function () {

        console.log(extract_output_buffer);
        extract_output_buffer.seal();
    });
/*
 .run()
 .then(function(){

 console.log(extract_output_buffer);
 extract_output_buffer.seal();
 });
 */

transform_pump
    .logErrorsToConsole()
    .run()
    .then(function () {
        var from1buffer = extract_pump.from();
        var output1buffer = extract_pump.buffer('output');
        var from2buffer = transform_pump.from();
        var output2buffer = transform_pump.buffer('output');
        console.log(from1buffer.content.length);
        console.log(output1buffer.content.length);
        console.log(from2buffer.content.length);
        console.log(output2buffer.content.length);
    });
@novaki
Copy link
Member

novaki commented Sep 20, 2015

Hi, the problem is that the extract_output_buffer buffer is not written. The promise that you return in the extract pump's process() should resolve when the output buffer is written.

@thxmike
Copy link
Author

thxmike commented Sep 20, 2015

But isn't the extract_output_buffer written to automatically.var extract_output_buffer = extract_pump.buffer(); indicates it is just a reference to the extract_pump output buffer.

per your documentation:
"A pump reads data from its input buffer or stream and copies it to the output buffer by default:"

So if I got this right here what should happen.

  • I have a pump called "extract_pump". (ln 62)
  • I define a buffer called "extract_input_buffer", and set it to "extract_pump" input buffer "from" (ln 64 and 83)
  • I write to the "extract_input_pump", asynchronously for each row from the database, (ln 44)
  • I reference the "extract_pump"'s output buffer, and call it extract_output_buffer.(ln 66)
  • I define a second pump "transform_pump"
  • I used "extract_output_buffer" as the input of "transform_pump" (ln 84)
  • execute start on both "extract_pump" and "transform_pump" (ln 93 and 110)

Shouldn't "extract_pump" write from the "extract_input_buffer" to the "extract_output_buffer" and write it to the "extract_output_buffer"? based on your documentation, that is the assumption

Of is this only default behavior, when you don't override the "process" function?

As a developer, when you use (override) the "process" method, is he/she responsible for writing to the output buffer?

If this is the case, can you update the documentation?

Please let me know, I am eager to know your answer

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants