const zookeeper = require('node-zookeeper-client');
const State = zookeeper.State;
const ACL = zookeeper.ACL;
const CreateMode = zookeeper.CreateMode;
const Exception = zookeeper.Exception;
// use bluebird promises if available
try {
var bluebird = require('bluebird');
if (bluebird) {
global.Promise = bluebird.Promise;
}
} catch (_) { }
module.exports = Object.assign({}, zookeeper);
/**
* An asynchronous zookeeper transaction. Created by AsyncClient.transaction()
*/
class AsyncTransaction {
constructor(transaction) {
this._transaction = transaction;
}
/**
* Add a create operation with given path, data, acls and mode.
* @param {String} path Path of the node.
* @param {Buffer} data The data buffer, optional, defaults to null.
* @param {ACL[]} acls An array of ACL objects, optional, defaults to ACL.OPEN_ACL_UNSAFE
* @param {CreateMode} mode The creation mode, optional, defaults to CreateMode.PERSISTENT
* @returns {AsyncTransaction}
*/
create(path, data = null, acls = null, mode = null) {
this._transaction.create(path, data, acls, mode);
return this;
}
/**
* Add a set-data operation with the given path, data and optional version.
* @param {string} path Path of the node.
* @param {Buffer} data The data buffer, or null.
* @param {number} version The version of the node, optional, defaults to -1.
* @returns {AsyncTransaction}
*/
setData(path, data, version = -1) {
this._transaction.setData(path, data, version);
return this;
}
/**
* Add a check (existence) operation with given path and optional version.
* @param {*} path Path of the node.
* @param {*} version The version of the node, optional, defaults to -1.
* @returns {AsyncTransaction}
*/
check(path, version = -1) {
this._transaction.check(path, version);
return this;
}
/**
* Add a delete operation with the given path and optional version.
* @param {*} path Path of the node.
* @param {*} version The version of the node, optional, defaults to -1.
* @returns {AsyncTransaction}
*/
remove(path, version = -1) {
this._transaction.remove(path, version);
return this;
}
/**
* Execute the transaction atomically.
* @param {Function} cb The callback function.
*/
commit(cb) {
this._transaction.commit(cb);
}
/**
* Execute the transaction atomically. Resolves when the operation has completed,
* rejects if anything goes wrong.
* @returns {Promise}
*/
commitAsync() {
return new Promise((resolve, reject) => {
this._transaction.commit((e, r) => {
if (e) reject(e);
else resolve(r);
})
});
}
}
/**
* A zookeeper client that has methods that return promises.
* @example
* const zk = require('node-zookeeper-client-async');
*
* const client = zk.createAsyncClient("127.0.0.1:2181");
*
* // connect to the server
* await client.connectAsync();
* console.log('connected!');
*
* // create a node
* const rootPath = await client.mkdirpAsync('/test');
* console.log(`created ${rootPath}`)
*
* // add some ephemeral nodes
* await client.createAsync('/test/counter-', Buffer.from('first'), null, zk.CreateMode.EPHEMERAL_SEQUENTIAL);
* await client.createAsync('/test/counter-', Buffer.from('second'), null, zk.CreateMode.EPHEMERAL_SEQUENTIAL);
*
* // list the nodes
* const nodes = await client.getChildrenAsync('/test');
*
* // print stuff to console
* console.log(`${rootPath} has the children:`)
* await Promise.all(nodes.map(async node => {
* const data = await client.getDataAsync(`/test/${node}`);
* console.log(` ${node}: ${data.data}`);
* }));
*
* // delete everything
* await client.rmrfAsync(rootPath);
*
* // shut down
* await client.closeAsync();
* console.log('disconnected');
*/
class AsyncClient {
/**
* Creates a new AsyncClient for communicating with zookeeper. This object will
* expose all the methods of the underlying zookeeper client, but also expose
* a number of methods that return promises.
* @param {Client} client the underlying zookeeper client created by createClient.
*/
constructor(client) {
this._client = client;
// this object has all the methods of the underlying client.
for (let k in client) {
if (k in AsyncClient.prototype) continue;
const v = client[k];
if (typeof v == 'function') this[k] = v.bind(client);
}
}
/**
* Initiate the connection to the provided server list (ensemble). The client will
* pick an arbitrary server from the list and attempt to connect to it. If the
* establishment of the connection fails, another server will be tried (picked
* randomly) until a connection is established or close method is invoked.
* @returns {Promise} resolves on connect. Rejects with message on failure.
*/
connectAsync() {
return new Promise((resolve, reject) => {
this._client.once('connected', () => resolve());
this._client.once('connectedReadOnly', () => resolve());
this._client.once('authenticationFailed', () => reject('Authentication failed.'));
this._client.once('disconnected', () => reject('Client disconnected before it could open a successful connection.'));
this._client.connect();
});
}
/**
* Close this client. Once the client is closed, its session becomes invalid. All the
* ephemeral nodes in the ZooKeeper server associated with the session will be
* removed. The watchers left on those nodes (and on their parents) will be triggered.
* Resolves true when the client disconnects, resolves false if the client is already
* disconnected.
* @returns {Promise} resolves when disconnected. Does not reject.
*/
closeAsync() {
return new Promise((resolve, reject) => {
if (this._client.getState() == State.DISCONNECTED) {
resolve(false);
} else {
this._client.once('disconnected', () => resolve(true));
this._client.close();
}
});
}
/**
* Create a node with given path, data, acls and mode. Resolves the path name if
* successful, resolves false if the path already exists, and rejects for anything
* else going wrong
* @param {String} path Path of the node.
* @param {Buffer} data The data buffer, optional, defaults to null.
* @param {ACL} acls An array of ACL objects, optional, defaults to ACL.OPEN_ACL_UNSAFE.
* @param {CreateMode} mode The creation mode, optional, defaults to CreateMode.PERSISTENT.
* @returns {Promise} resolves with the path of the created item. Rejects with an error
* if something goes wrong.
*/
createAsync(path, data = null, acls = null, mode = null) {
return new Promise((resolve, reject) => {
this._client.create(path, data, acls, mode, (error, path) => {
if (error) {
if (error.code == Exception.NODE_EXISTS) {
resolve(false);
} else {
reject(error);
}
} else {
resolve(path);
}
});
});
}
/**
* Delete a node with the given path and version. If version is provided and not equal
* to -1, the request will fail when the provided version does not match the server
* version. Resolves true if the node is deleted. Resolves false if the node does
* not exist. Rejects if the node has children, if the version does not match, or
* for any other result.
* @param {String} path Path of the node.
* @param {Number} version The version of the node, optional, defaults to -1.
* @returns {Promise}
*/
removeAsync(path, version = -1) {
return new Promise((resolve, reject) => {
this._client.remove(path, version, e => {
switch (e && e.code) {
case Exception.NO_NODE: resolve(false); break;
case null:
resolve(true);
break;
default:
reject(e);
break;
}
});
});
}
/**
* For the given node path, retrieve the children list and the stat. The children will
* be an unordered list of strings. Resolves the stat object if the node exists,
* resolves null if it does not exist, rejects if anything goes wrong.
* @param {string} path the Path of the node.
* @returns {Promise}
*/
existsAsync(path) {
return new Promise((resolve, reject) => {
this._client.exists(path, null, (e, stat) => {
if (e) {
reject(e);
} else {
resolve(stat);
}
});
});
}
/**
* For the given node path, retrieve the children list and the stat. The children will
* be an unordered list of strings. Resolved the children if the node exists, resolves
* null if the node does not exist. Rejects if anything goes wrong.
* @param {string} path the Path of the node.
* @returns {Promise}
*/
getChildrenAsync(path) {
return new Promise((resolve, reject) => {
this._client.getChildren(path, null, (e, children) => {
if (e) {
if (e.code == Exception.NO_NODE) resolve(null);
else reject(e);
} else {
resolve(children);
}
});
});
}
/**
* Retrieve the data and the stat of the node of the given path. Resolves an object
* containing data as a Buffer object and stat as the stat object. Resolves null
* if the node does not exist. Rejects if anything goes wrong.
* @param {string} path the Path of the node.
* @returns {Promise}
*/
getDataAsync(path) {
return new Promise((resolve, reject) => {
this._client.getData(path, null, (e, data, stat) => {
if (e) {
if (e.code == Exception.NO_NODE) resolve(null);
else reject(e);
} else {
resolve({ data, stat });
}
});
});
}
/**
* Set the data for the node of the given path if such a node exists and the optional
* given version matches the version of the node (if the given version is -1, it
* matches any node's versions). Will resolve the stat of the node if successful. Will
* reject if unsuccessful or if the node does not exist.
* @param {string} path the Path of the node.
* @param {Buffer} data the data to set on the node.
* @param {Number} version the version to set. -1 (default) to match any version.
* @returns {Promise}
*/
setDataAsync(path, data, version = -1) {
return new Promise((resolve, reject) => {
this._client.setData(path, data, version, (e, stat) => {
if (e) {
reject(e);
} else {
resolve(stat);
}
});
});
}
/**
* Retrieve the list of ACL and stat of the node of the given path. Will resolve an
* object with acls as a list of ACL objects and stat as the stat for the node. Will
* resolve null if the node does not exist and will reject if anything goes wrong.
* @param {string} path the Path of the node.
* @returns {Promise}
*/
getACLAsync(path) {
return new Promise((resolve, reject) => {
this._client.getACL(path, (e, acls, stat) => {
if (e) {
if (e.code == Exception.NO_NODE) resolve(null);
else reject(e);
} else {
resolve({ stat, acls });
}
});
});
}
/**
* Set the ACL for the node of the given path if such a node exists and the given
* version (optional) matches the version of the node on the server. (if the given
* version is -1, it matches any versions). Will resolve on success and reject
* if anything goes wrong or the node does not exist.
* @param {string} path the Path of the node.
* @param {ACL[]} acls An array of ACL instances to set on the node.
* @param {Number} version the version to set. -1 (default) to match any version.
*/
setACLAsync(path, acls, version = -1) {
return new Promise((resolve, reject) => {
this._client.setACL(path, acls, version, (e, stat) => {
if (e) {
reject(e);
} else {
resolve(stat);
}
});
});
}
/**
* Create given path in a way similar to mkdir -p. Will resolve the path if the node
* is created and will reject if anything goes wrong.
* @param {string} path the Path of the node.
* @param {Buffer} data The data buffer, optional, defaults to null.
* @param {ACL[]} acls array of ACL objects, optional, defaults to ACL.OPEN_ACL_UNSAFE
* @param {CreateMode} mode The creation mode, optional, defaults to CreateMode.PERSISTENT
* @return {Promise}
*/
mkdirpAsync(path, data = null, acls = null, mode = null) {
return new Promise((resolve, reject) => {
this._client.mkdirp(path, data, acls, mode, (e, path) => {
if (e) {
reject(e);
} else {
resolve(path);
}
});
});
}
/**
* Remove a given path in a way similar to rm -rf. Will resolve if the node is deleted
* or does not exist and reject if anything goes wrong.
* @param {string} path the Path of the node.
* @return {Promise}
*/
rmrfAsync(path) {
return new Promise((resolve, reject) => {
this.getChildrenAsync(path).then(children => {
if (!children || !children.length) {
// no children, ready to remove the node.
this.removeAsync(path).then(resolve, reject);
} else {
var promises = children.map(c => this.rmrfAsync(`${path}/${c}`));
Promise.all(promises).then(() => {
// all children removed, now remove this
this.removeAsync(path).then(resolve, reject);
}, reject);
}
}, reject);
});
}
/**
* Create and return a new Transaction instance which provides a builder object that
* can be used to construct and commit a set of operations atomically.
* @returns {AsyncTransaction}
*/
transaction() {
const transaction = this._client.transaction();
const asyncTransaction = new AsyncTransaction(transaction);
return asyncTransaction;
}
}
function createAsyncClient(connectionString, options = null) {
const clientInternal = zookeeper.createClient(connectionString, options);
return new AsyncClient(clientInternal);
}
module.exports.createAsyncClient = createAsyncClient;
module.exports.AsyncClient = AsyncClient;
module.exports.AsyncTransaction = AsyncTransaction;