Skip to content

Commit

Permalink
Release v1.2.0 (#3)
Browse files Browse the repository at this point in the history
- merged in ChunkEmitter, only used here
- copied in indexOfLF, removed haraka-utils dependency
- LGTM: remove unused variable
  • Loading branch information
msimerson authored Jun 24, 2022
1 parent 83fc346 commit 39aeaa1
Show file tree
Hide file tree
Showing 8 changed files with 1,888 additions and 27 deletions.
9 changes: 8 additions & 1 deletion Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,22 @@
### Unreleased


### [1.2.0] - 2022-06-24

- merged in ChunkEmitter, only used here
- copied in indexOfLF, removed haraka-utils dependency


### [1.1.0] - 2022-06-23

- fix: boundary marker corruption issue haraka/Haraka#3068



## 1.0.0 - 2022-06-23

- Import from Haraka
- convert tests to mocha


[1.1.0]: https://github.com/haraka/message-stream/releases/tag/1.1.0
[1.2.0]: https://github.com/haraka/message-stream/releases/tag/1.2.0
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,17 @@

# haraka-message-stream

## Used By

- Haraka/transaction.js
- haraka/test-fixtures/transaction

## USAGE

```js
new MessageStream(cfg, uuid, header_list)
```


<!-- leave these buried at the bottom of the document -->
[ci-img]: https://github.com/haraka/message-stream/actions/workflows/ci.yml/badge.svg
Expand Down
110 changes: 92 additions & 18 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
'use strict';

const EventEmitter = require('events').EventEmitter;
const fs = require('fs');
const Stream = require('stream').Stream;
const utils = require('haraka-utils');

const ChunkEmitter = require('haraka-chunk-emitter');

const STATE_HEADERS = 1;
const STATE_BODY = 2;
const STATE = {
HEADERS: 1,
BODY : 2,
};

class MessageStream extends Stream {
constructor (cfg, id, headers) {
constructor (cfg = {}, id, headers) {
super();
if (!id) throw new Error('id required');
this.uuid = id;
this.write_ce = null;
this.read_ce = null;
this.bytes_read = 0;
this.state = STATE_HEADERS;
this.state = STATE.HEADERS;
this.idx = {};
this.end_called = false;
this.end_callback = null;
Expand Down Expand Up @@ -67,16 +67,16 @@ class MessageStream extends Stream {
this.bytes_read += line.length;

// Build up an index of 'interesting' data on the fly
if (this.state === STATE_HEADERS) {
if (this.state === STATE.HEADERS) {
// Look for end of headers line
if (line.length === 2 && line[0] === 0x0d && line[1] === 0x0a) {
this.idx.headers = { start: 0, end: this.bytes_read-line.length };
this.state = STATE_BODY;
this.state = STATE.BODY;
this.idx.body = { start: this.bytes_read };
}
}

if (this.state === STATE_BODY) {
if (this.state === STATE.BODY) {
// Look for MIME boundaries
if (line.length > 4 && line[0] === 0x2d && line[1] == 0x2d) {
let boundary = line.slice(2).toString().replace(/\s*$/,'');
Expand Down Expand Up @@ -197,10 +197,7 @@ class MessageStream extends Stream {
}
}

/*
** READABLE STREAM
*/

// READABLE STREAM
_read () {
const self = this;
if (!this.end_called) {
Expand Down Expand Up @@ -231,9 +228,8 @@ class MessageStream extends Stream {
});
}
else {
// Read the message body by line
// If we have queued entries, then we didn't
// create a queue file, so we read from memory.
// Read the message body by line. If we have queued entries, then
// we didn't create a queue file, so read from memory.
if (this._queue.length > 0) {
// TODO: implement start/end offsets
for (let i=0; i<this._queue.length; i++) {
Expand All @@ -260,7 +256,7 @@ class MessageStream extends Stream {

process_buf (buf) {
let offset = 0;
while ((offset = utils.indexOfLF(buf)) !== -1) {
while ((offset = indexOfLF(buf)) !== -1) {
let line = buf.slice(0, offset+1);
buf = buf.slice(line.length);
// Don't output headers if they where sent already
Expand Down Expand Up @@ -411,6 +407,13 @@ class MessageStream extends Stream {
}
}

function indexOfLF (buf) {
for (let i=0; i<buf.length; i++) {
if (buf[i] === 0x0a) return i;
}
return -1;
}

module.exports = MessageStream;

class GetDataStream extends Stream {
Expand Down Expand Up @@ -439,3 +442,74 @@ class GetDataStream extends Stream {
// ignore
}
}

class ChunkEmitter extends EventEmitter {
constructor (buffer_size) {
super();
this.buffer_size = parseInt(buffer_size) || (64 * 1024);
this.buf = null;
this.pos = 0;
this.bufs = [];
this.bufs_size = 0;
}

fill (input) {
if (typeof input === 'string') {
input = Buffer.from(input);
}

// Optimization: don't allocate a new buffer until the input we've
// had so far is bigger than our buffer size.
if (!this.buf) {
// We haven't allocated a buffer yet
this.bufs.push(input);
this.bufs_size += input.length;
if ((input.length + this.bufs_size) > this.buffer_size) {
this.buf = Buffer.alloc(this.buffer_size);
const in_new = Buffer.concat(this.bufs, this.bufs_size);
input = in_new;
// Reset
this.bufs = [];
this.bufs_size = 0;
}
else {
return;
}
}

while (input.length > 0) {
let remaining = this.buffer_size - this.pos;
if (remaining === 0) {
this.emit('data', this.buf); //.slice(0));
this.buf = Buffer.alloc(this.buffer_size);
this.pos = 0;
remaining = this.buffer_size;
}
const to_write = ((remaining > input.length) ? input.length : remaining);
input.copy(this.buf, this.pos, 0, to_write);
this.pos += to_write;
input = input.slice(to_write);
}
}

end (cb) {
let emitted = false;
if (this.bufs_size > 0) {
this.emit('data', Buffer.concat(this.bufs, this.bufs_size));
emitted = true;
}
else if (this.pos > 0) {
this.emit('data', this.buf.slice(0, this.pos));
emitted = true;
}
// Reset
this.buf = null;
this.pos = 0;
this.bufs = [];
this.bufs_size = 0;
if (cb && typeof cb === 'function') cb();
return emitted;
}
}

module.exports.ChunkEmitter = ChunkEmitter
12 changes: 5 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "haraka-message-stream",
"version": "1.1.0",
"description": "Haraka message stream library",
"version": "1.2.0",
"description": "Haraka email message stream",
"main": "index.js",
"scripts": {
"lint": "npx eslint *.js test/*.js",
"lint": "npx eslint *.js test",
"lintfix": "npx eslint --fix *.js test",
"versions": "npx dependency-version-checker check",
"test": "npx mocha"
Expand All @@ -15,6 +15,7 @@
},
"keywords": [
"haraka",
"email",
"message-stream"
],
"author": "Haraka Team <[email protected]>",
Expand All @@ -29,8 +30,5 @@
"haraka-test-fixtures": "*",
"mocha": ">=9"
},
"dependencies": {
"haraka-chunk-emitter": "^1.0.1",
"haraka-utils": "^1.0.3"
}
"dependencies": {}
}
64 changes: 64 additions & 0 deletions test/chunk-emitter.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@

const assert = require('assert')
const fs = require('fs')
const path = require('path')

const ChunkEmitter = require('../index').ChunkEmitter

describe('chunk-emitter', function () {

beforeEach(function () {
this.ce = new ChunkEmitter()
this._written = 0
})

it('loads', function () {
assert.ok(this.ce)
})

it('emits all unbuffered bytes', function (done) {
const msgPath = path.join(__dirname, 'fixtures', 'haraka-icon-attach.eml')
const eml = fs.readFileSync(msgPath, 'utf8');

this._write = (data) => {
this._written = (this._written || 0) + data.length
if (eml.length === this._written) {
assert.equal(eml.length, this._written)
done()
}
}

this.ce.on('data', chunk => {
this._write(chunk);
})

this.ce.fill(eml)
this.ce.end()
})

it('emits all bigger than buffer bytes', function (done) {
const msgPath = path.join(__dirname, 'fixtures', 'haraka-tarball-attach.eml')
// console.log(`msgPath: ${msgPath}`)
const eml = fs.readFileSync(msgPath, 'utf8');
// console.log(`length: ${eml.length}`)

this._write = (data) => {
// console.log(`_write: ${data.length} bytes`)
this._written = this._written + data.length
// console.log(`_written: ${this._written}`)
if (eml.length === this._written) {
assert.equal(eml.length, this._written)
// console.log(this.ce)
done()
}
}

this.ce.on('data', chunk => {
// console.log(`ce.on.data: ${chunk.length} bytes`)
this._write(chunk);
})

this.ce.fill(eml)
this.ce.end()
})
})
Loading

0 comments on commit 39aeaa1

Please sign in to comment.