Skip to content

Commit

Permalink
Replace duplexify and friends with readable-stream v4 (#7)
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Nagy <[email protected]>
  • Loading branch information
vweevers and ronag authored Jul 2, 2022
1 parent 9606554 commit 308bde7
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 19 deletions.
20 changes: 11 additions & 9 deletions guest.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
'use strict'

const duplexify = require('duplexify')
const { AbstractLevel, AbstractIterator } = require('abstract-level')
const eos = require('end-of-stream')
const lpstream = require('length-prefixed-stream')
const lpstream = require('@vweevers/length-prefixed-stream')
const ModuleError = require('module-error')
const { input, output } = require('./tags')
const { Duplex, pipeline, finished } = require('readable-stream')

const kExplicitClose = Symbol('explicitClose')
const kAbortRequests = Symbol('abortRequests')
Expand Down Expand Up @@ -108,10 +107,8 @@ class ManyLevelGuest extends AbstractLevel {
self[kFlushed]()
})

const proxy = duplexify()
proxy.setWritable(decode)
proxy.setReadable(encode)
eos(proxy, cleanup)
const proxy = Duplex.from({ writable: decode, readable: encode })
finished(proxy, cleanup)
this[kRpcStream] = proxy
return proxy

Expand Down Expand Up @@ -312,7 +309,7 @@ class ManyLevelGuest extends AbstractLevel {
this[kAbortRequests]('Aborted on database close()', 'LEVEL_DATABASE_NOT_OPEN')

if (this[kRpcStream]) {
eos(this[kRpcStream], () => {
finished(this[kRpcStream], () => {
this[kRpcStream] = null
this._close(cb)
})
Expand All @@ -330,7 +327,12 @@ class ManyLevelGuest extends AbstractLevel {
// For tests only so does not need error handling
this[kExplicitClose] = false
const remote = this[kRemote]()
remote.pipe(this.connect()).pipe(remote)
pipeline(
remote,
this.connect(),
remote,
() => {}
)
} else if (this[kExplicitClose]) {
throw new ModuleError('Cannot reopen many-level database after close()', {
code: 'LEVEL_NOT_SUPPORTED'
Expand Down
9 changes: 4 additions & 5 deletions host.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
'use strict'

const lpstream = require('length-prefixed-stream')
const lpstream = require('@vweevers/length-prefixed-stream')
const ModuleError = require('module-error')
const eos = require('end-of-stream')
const duplexify = require('duplexify')
const { Duplex, finished } = require('readable-stream')
const { input, output } = require('./tags')

const rangeOptions = new Set(['gt', 'gte', 'lt', 'lte'])
Expand Down Expand Up @@ -60,7 +59,7 @@ function createRpcStream (db, options, streamOptions) {
const readonly = options.readonly
const decode = lpstream.decode()
const encode = lpstream.encode()
const stream = duplexify(decode, encode)
const stream = Duplex.from({ writable: decode, readable: encode })

const preput = options.preput
const predel = options.predel
Expand All @@ -85,7 +84,7 @@ function createRpcStream (db, options, streamOptions) {

const iterators = new Map()

eos(stream, function () {
finished(stream, function () {
for (const iterator of iterators.values()) {
iterator.close()
}
Expand Down
8 changes: 3 additions & 5 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,11 @@
"UPGRADING.md"
],
"dependencies": {
"@vweevers/length-prefixed-stream": "^1.0.0",
"abstract-level": "^1.0.3",
"duplexify": "^4.1.1",
"end-of-stream": "^1.1.0",
"length-prefixed-stream": "^2.0.0",
"module-error": "^1.0.2",
"protocol-buffers-encodings": "^1.1.0"
"protocol-buffers-encodings": "^1.1.0",
"readable-stream": "^4.0.0"
},
"devDependencies": {
"@types/readable-stream": "^2.3.13",
Expand All @@ -43,7 +42,6 @@
"memory-level": "^1.0.0",
"nyc": "^15.1.0",
"protocol-buffers": "^5.0.0",
"readable-stream": "^3.6.0",
"standard": "^16.0.3",
"tape": "^5.0.1",
"ts-standard": "^11.0.0",
Expand Down

0 comments on commit 308bde7

Please sign in to comment.