Home Reference Source

packages/oo7-substrate/src/nodeService.js

  1. const { Bond } = require('oo7')
  2. const WebSocket = require('isomorphic-ws')
  3.  
  4. const subscriptionKey = {
  5. author_submitAndWatchExtrinsic: {
  6. notification: 'author_extrinsicUpdate',
  7. subscribe: 'author_submitAndWatchExtrinsic',
  8. unsubscribe: 'author_unwatchExtrinsic'
  9. },
  10. state_storage: {
  11. notification: 'state_storage',
  12. subscribe: 'state_subscribeStorage',
  13. unsubscribe: 'state_unsubscribeStorage'
  14. },
  15. chain_newHead: {
  16. notification: 'chain_newHead',
  17. subscribe: 'chain_subscribeNewHead',
  18. unsubscribe: 'chain_unsubscribeNewHead'
  19. },
  20. chain_finalisedHead: {
  21. notification: 'chain_finalisedHead',
  22. subscribe: 'chain_subscribeFinalisedHeads',
  23. unsubscribe: 'chain_unsubscribeFinalisedHeads'
  24. },
  25. state_runtimeVersion: {
  26. notification: 'state_runtimeVersion',
  27. subscribe: 'state_subscribeRuntimeVersion',
  28. unsubscribe: 'state_unsubscribeRuntimeVersion'
  29. }
  30. }
  31.  
  32. let uri = ['ws://127.0.0.1:9944']
  33.  
  34. function setNodeUri(u) {
  35. if (uri === u) return
  36. uri = u
  37. if (!s_nodeService) return // prevent instanciating
  38. s_nodeService.uri = u
  39. s_nodeService.uriIndex = 0
  40. s_nodeService.uriChanged = true
  41. s_nodeService.start()
  42. }
  43.  
  44. class NodeService {
  45. constructor (uri) {
  46. this.subscriptions = {}
  47. this.cancelations = {}
  48. this.pendingCancelations = {}
  49. this.theirIds = {}
  50. this.onReply = {}
  51. this.onceOpen = []
  52. this.index = 1
  53. this.uriIndex = 0
  54. this.backoff = 0
  55. this.uri = uri
  56. this.status = new Bond
  57. this.start(uri[0])
  58. }
  59.  
  60. start (uri = this.uri[0]) {
  61. if (this.ws) {
  62. this.ws.close()
  63. delete this.ws
  64. }
  65.  
  66. let that = this
  67. this.ws = new WebSocket(uri)
  68. this.ws.onopen = function () {
  69. console.log('Connection open')
  70. that.rejig()
  71. that.backoff = 0
  72. let onceOpen = that.onceOpen;
  73. that.onceOpen = []
  74. setTimeout(() => {
  75. // console.warn("Proceessing deferred requests...")
  76. onceOpen.forEach(f => f())
  77. }, 0)
  78. that.status.trigger({connected: uri})
  79. }
  80. this.ws.onmessage = function (msg) {
  81. if (that.reconnect) {
  82. clearTimeout(that.reconnect)
  83. }
  84.  
  85. let d = JSON.parse(msg.data)
  86. // console.log('Incoming:', d);
  87. if (d.id) {
  88. that.onReply[d.id](d)
  89. delete that.onReply[d.id];
  90. } else if (d.method && d.params && that.subscriptions[d.params.subscription]) {
  91. that.subscriptions[d.params.subscription].callback(d.params.result, d.method)
  92. } else if (that.pendingCancelations[d.params.subscription]) {
  93. // Ok; this message was sent by them before they heard that we wanted to unsubscribe.
  94. } else {
  95. console.warn("Subscription reply without recognised ID", d.params.subscription)
  96. }
  97.  
  98. // epect a message every 10 seconds or we reconnect.
  99. that.reconnect = setTimeout(() => { console.log('Reconnecting.'); that.start() }, 60000)
  100. }
  101. this.ws.onerror = (err) => {
  102. if (that.uriChanged) {
  103. delete that.uriChanged
  104. return // no reconnection if uri changed
  105. }
  106. setTimeout(() => {
  107. that.uriIndex = (that.uriIndex + 1) % that.uri.length
  108. that.start(that.uri[that.uriIndex])
  109. }, that.backoff)
  110. that.backoff = Math.min(30000, that.backoff + 1000)
  111. that.status.trigger({error: err})
  112. }
  113. }
  114.  
  115. rejig () {
  116. let that = this
  117. let subs = this.subscriptions
  118. this.subscriptions = {}
  119. let theirIds = this.theirIds
  120. this.theirIds = {}
  121. Object.keys(theirIds).forEach(ourId => {
  122. let sub = subs[theirIds[ourId]]
  123. that.subscribe(sub.what, sub.params, sub.callback, console.warn, ourId)
  124. })
  125. }
  126.  
  127. req (method, params, callback) {
  128. let that = this
  129. let doSend = () => {
  130. let id = '' + this.index++;
  131. // console.warn("Executing request", method, params, id, callback)
  132. let msg = {
  133. "jsonrpc": "2.0",
  134. "id": id,
  135. "method": method,
  136. "params": params
  137. };
  138. that.ws.send(JSON.stringify(msg))
  139. that.onReply[id] = callback
  140. }
  141.  
  142. if (this.ws.readyState === 1) {
  143. doSend(callback)
  144. } else {
  145. // console.warn("Defering request until connected", method, params)
  146. that.onceOpen.push(() => {
  147. doSend(callback)
  148. })
  149. }
  150. }
  151.  
  152. request (method, params = []) {
  153. let that = this
  154. return new Promise((resolve, reject) => {
  155. that.req(method, params, msg => {
  156. // console.warn("Processing request reply", method, params, msg)
  157. if (msg.error) {
  158. reject(msg.error)
  159. } else {
  160. resolve(msg.result)
  161. }
  162. })
  163. })
  164. }
  165.  
  166. subscribe (what, params, callback, errorHandler, ourId = null) {
  167. let that = this
  168. return new Promise((resolve, reject) => {
  169. // console.log('Subscribing', ourId)
  170. this.req(subscriptionKey[what].subscribe, params, msg => {
  171. if (msg.error) {
  172. // console.log('Error subscribing', ourId)
  173. errorHandler(msg.error)
  174. } else {
  175. let theirId = msg.result
  176. // console.log('Subscribed', 'ourId=', ourId, 'theirId=', theirId)
  177. if (that.cancelations[ourId]) {
  178. // console.log('Delayed unsubscription of', ourId)
  179. that.pendingCancelations[theirId] = ourId
  180. this.req(subscriptionKey[what].unsubscribe, [theirId], () => {
  181. delete that.pendingCancelations[theirId]
  182. delete that.cancelations[ourId]
  183. }, errorHandler)
  184. } else {
  185. that.subscriptions[theirId] = { what, params, callback }
  186. ourId = ourId || theirId
  187. that.theirIds[ourId] = theirId
  188. }
  189. // We resolve to our ID regardless which should be safe since
  190. // unsubscribes of old IDs are no-ops.
  191. resolve(ourId)
  192. }
  193. })
  194. })
  195. }
  196.  
  197. unsubscribe (ourId) {
  198. let that = this
  199.  
  200. if (this.theirIds[ourId] == null) {
  201. // console.log('Resubscription not yet complete. Defering unsubscribe', ourId)
  202. this.cancelations[ourId] = true
  203. return
  204. }
  205. let theirId = this.theirIds[ourId]
  206. if (!this.subscriptions[theirId]) {
  207. throw 'Invalid subscription id'
  208. }
  209. let unsubscribe = subscriptionKey[this.subscriptions[theirId].what].unsubscribe
  210.  
  211. // console.log('Unsubscribing', ourId, theirId, this.subscriptions[theirId].what, unsubscribe)
  212. this.req(unsubscribe, [theirId], () => {
  213. delete that.theirIds[ourId]
  214. delete that.subscriptions[theirId]
  215. })
  216. }
  217. finalise () {
  218. delete this.ws;
  219. }
  220. }
  221.  
  222. let s_nodeService = null;
  223.  
  224. function nodeService() {
  225. if (!s_nodeService) {
  226. s_nodeService = new NodeService(uri);
  227. }
  228. return s_nodeService;
  229. }
  230.  
  231. module.exports = { nodeService, NodeService, setNodeUri };