packages/oo7-substrate/src/nodeService.js
const { Bond } = require('oo7')
const WebSocket = require('isomorphic-ws')
const subscriptionKey = {
author_submitAndWatchExtrinsic: {
notification: 'author_extrinsicUpdate',
subscribe: 'author_submitAndWatchExtrinsic',
unsubscribe: 'author_unwatchExtrinsic'
},
state_storage: {
notification: 'state_storage',
subscribe: 'state_subscribeStorage',
unsubscribe: 'state_unsubscribeStorage'
},
chain_newHead: {
notification: 'chain_newHead',
subscribe: 'chain_subscribeNewHead',
unsubscribe: 'chain_unsubscribeNewHead'
},
chain_finalisedHead: {
notification: 'chain_finalisedHead',
subscribe: 'chain_subscribeFinalisedHeads',
unsubscribe: 'chain_unsubscribeFinalisedHeads'
},
state_runtimeVersion: {
notification: 'state_runtimeVersion',
subscribe: 'state_subscribeRuntimeVersion',
unsubscribe: 'state_unsubscribeRuntimeVersion'
}
}
let uri = ['ws://127.0.0.1:9944']
function setNodeUri(u) {
if (uri === u) return
uri = u
if (!s_nodeService) return // prevent instanciating
s_nodeService.uri = u
s_nodeService.uriIndex = 0
s_nodeService.uriChanged = true
s_nodeService.start()
}
class NodeService {
constructor (uri) {
this.subscriptions = {}
this.cancelations = {}
this.pendingCancelations = {}
this.theirIds = {}
this.onReply = {}
this.onceOpen = []
this.index = 1
this.uriIndex = 0
this.backoff = 0
this.uri = uri
this.status = new Bond
this.start(uri[0])
}
start (uri = this.uri[0]) {
if (this.ws) {
this.ws.close()
delete this.ws
}
let that = this
this.ws = new WebSocket(uri)
this.ws.onopen = function () {
console.log('Connection open')
that.rejig()
that.backoff = 0
let onceOpen = that.onceOpen;
that.onceOpen = []
setTimeout(() => {
// console.warn("Proceessing deferred requests...")
onceOpen.forEach(f => f())
}, 0)
that.status.trigger({connected: uri})
}
this.ws.onmessage = function (msg) {
if (that.reconnect) {
clearTimeout(that.reconnect)
}
let d = JSON.parse(msg.data)
// console.log('Incoming:', d);
if (d.id) {
that.onReply[d.id](d)
delete that.onReply[d.id];
} else if (d.method && d.params && that.subscriptions[d.params.subscription]) {
that.subscriptions[d.params.subscription].callback(d.params.result, d.method)
} else if (that.pendingCancelations[d.params.subscription]) {
// Ok; this message was sent by them before they heard that we wanted to unsubscribe.
} else {
console.warn("Subscription reply without recognised ID", d.params.subscription)
}
// epect a message every 10 seconds or we reconnect.
that.reconnect = setTimeout(() => { console.log('Reconnecting.'); that.start() }, 60000)
}
this.ws.onerror = (err) => {
if (that.uriChanged) {
delete that.uriChanged
return // no reconnection if uri changed
}
setTimeout(() => {
that.uriIndex = (that.uriIndex + 1) % that.uri.length
that.start(that.uri[that.uriIndex])
}, that.backoff)
that.backoff = Math.min(30000, that.backoff + 1000)
that.status.trigger({error: err})
}
}
rejig () {
let that = this
let subs = this.subscriptions
this.subscriptions = {}
let theirIds = this.theirIds
this.theirIds = {}
Object.keys(theirIds).forEach(ourId => {
let sub = subs[theirIds[ourId]]
that.subscribe(sub.what, sub.params, sub.callback, console.warn, ourId)
})
}
req (method, params, callback) {
let that = this
let doSend = () => {
let id = '' + this.index++;
// console.warn("Executing request", method, params, id, callback)
let msg = {
"jsonrpc": "2.0",
"id": id,
"method": method,
"params": params
};
that.ws.send(JSON.stringify(msg))
that.onReply[id] = callback
}
if (this.ws.readyState === 1) {
doSend(callback)
} else {
// console.warn("Defering request until connected", method, params)
that.onceOpen.push(() => {
doSend(callback)
})
}
}
request (method, params = []) {
let that = this
return new Promise((resolve, reject) => {
that.req(method, params, msg => {
// console.warn("Processing request reply", method, params, msg)
if (msg.error) {
reject(msg.error)
} else {
resolve(msg.result)
}
})
})
}
subscribe (what, params, callback, errorHandler, ourId = null) {
let that = this
return new Promise((resolve, reject) => {
// console.log('Subscribing', ourId)
this.req(subscriptionKey[what].subscribe, params, msg => {
if (msg.error) {
// console.log('Error subscribing', ourId)
errorHandler(msg.error)
} else {
let theirId = msg.result
// console.log('Subscribed', 'ourId=', ourId, 'theirId=', theirId)
if (that.cancelations[ourId]) {
// console.log('Delayed unsubscription of', ourId)
that.pendingCancelations[theirId] = ourId
this.req(subscriptionKey[what].unsubscribe, [theirId], () => {
delete that.pendingCancelations[theirId]
delete that.cancelations[ourId]
}, errorHandler)
} else {
that.subscriptions[theirId] = { what, params, callback }
ourId = ourId || theirId
that.theirIds[ourId] = theirId
}
// We resolve to our ID regardless which should be safe since
// unsubscribes of old IDs are no-ops.
resolve(ourId)
}
})
})
}
unsubscribe (ourId) {
let that = this
if (this.theirIds[ourId] == null) {
// console.log('Resubscription not yet complete. Defering unsubscribe', ourId)
this.cancelations[ourId] = true
return
}
let theirId = this.theirIds[ourId]
if (!this.subscriptions[theirId]) {
throw 'Invalid subscription id'
}
let unsubscribe = subscriptionKey[this.subscriptions[theirId].what].unsubscribe
// console.log('Unsubscribing', ourId, theirId, this.subscriptions[theirId].what, unsubscribe)
this.req(unsubscribe, [theirId], () => {
delete that.theirIds[ourId]
delete that.subscriptions[theirId]
})
}
finalise () {
delete this.ws;
}
}
let s_nodeService = null;
function nodeService() {
if (!s_nodeService) {
s_nodeService = new NodeService(uri);
}
return s_nodeService;
}
module.exports = { nodeService, NodeService, setNodeUri };