Zookeeper ========= What problem is this paper trying to solve? Coordination in large-scale distributed systems Examples: configuration, group membership, leader election, locks Why not build coordination directly into each distributed application? Need at least three servers for fault tolerance Otherwise, network partition could lead to two masters/inconsistency But maybe only want two replicas of a server So use 3+ zookeeper nodes to coordinate master/backup replica Distributed coordination is hard to get right More chance to get it right *once* than in every app But couldn't you just use a high-quality library? Still administrative overhead leaving room for operator error Must ensure servers know about each other and how to talk to each other Dynamic servers can use zookeeper to find each other Google does it (published chubby paper so everyone wanted to do that) What are the design goals of Zookeeper General "coordination kernel" API that supports a lot of use cases Good performance Fault tolerance / high availability Let's go over the API Main abstraction: znodes--what's this? Hierarchically named like file system But each znode can have both data and children (like file+dir in one) What state does zookeper keep for each znode? - Type: regular or ephemeral (which disappears when client does) - Metadata: timestamp, version - Data: up to 1MB, configurable - Children: other znodes - Counter: used when creating sequential child znodes What operations can we do on znode? - create(path, data, flags) flags specifies regular/ephemeral and optionally sequential sequential appends counter++ to znode name fails if file exists (at least in ephemeral mode--p. 6) - delete(path, version) fails if version != -1 and doesn't match znode - exists(path, watch) Does path exist? If no and watch, then will get notified when created Note watch is a one-time notification - getData(path, watch) Get znode data, if watch then will get notified when znode changes - setData(path, data, version) - getChildren(path, watch) - sync(path) path is ignored What do they mean by asynchronous API? Can issue requests and get called back when they complete A common alternative to threads for issuing concurrent requests What's a zookeeper session? Connection between a client and servers (associated with watches, etc.) State replicated, so if server fails, client can connect to another one What are the consistency guarantees of the API? 1. Linearizable writes 2. FIFO client order What is linearizability? A consistency model (See reference [14]) Observed effects equivalent to all operations happening in some sequential order in which non-overlapping operations are temporally ordered E.g., |---A---| |---C---| |---B---| A must happen before B B and C could happen in either order, but everyone sees same order Linearizability is a nice property for distributed systems Objects can implement locally (provided no cross-object transactions) But makes it easy to reason about systems Example: write value, pick up phone and call friend, friend reads value Linearizability guarantees friend will see your write What is A-linearizability? That's where #2 (FIFO client order) comes in Lets single client issue overlapping operations & respects FIFO order Otherwise, concurrency requires "virtual clients" w. no ordering Wait, how can only the writes be linearizable? Means example of write, call friend, friend reads won't work? Why do they do this? Read performance--any server can answer read requests Means total reads/second increases with growing number of servers What would zookeeper need to do to provide linearizable reads? Could handle reads just like writes--replicate everywhere Would send reads through atomic broadcast protocol Could send all reads to an elected "primary" zookeeper server Would require primary to have *lease* so it knows it is still primary What's the workaround? Issue sync [c.f. reading question] What's an atomic broadcast protocol? Consensus applied to a series of messages (All nodes agree m1 is first message, m2 second message, etc.) Why does zookeeper need an atomic broadcast protocol? All servers must converge on identical state, even after failures Common technique is replicated state machine (RSM): 1. All servers agree on system's initial state (hard-coded) 2. All servers agree on each deterministic update before applying it What are leases? A time-limited promise by system to notify you before changing some state E.g., primary lease means primary knows it is still primary for 30 seconds Paper knocks Chubby for using client leases--why are these bad? Server promises "For 30 sec, I'll tell you before changing value x" Then client fails, so server can't notify it Delays updating x until lease expires Why doesn't zookeeper need client leases? Zookeeper delivers watch notifications *after* changes happen But note session timeout mechanism can effectively be used for leases Need ephemeral lock node to disapper after client failure? Waiting for client session to time out Example: Ready znode Master deletes ready znode, changes config, re-creates ready znode Clients call getData(ready, true) to get notified when deleted What happens if client sees ready before new master deletes it? E.g., client: getData(ready, true); getData(config1); getData(config2) Don't want to see inconsistent config1, config2 as new master writing FIFO ordering guarantees delivery of watch from ready before config data So client knows to ignore the results of config1, config2 Just calls exists(ready, true) and restarts read when ready exists Example: Simple lock Create ephemeral lock vnode--if succeeds, you have the lock If fails, call getData(lock, true), notified when deleted If client session fails, lock automatically released What's wrong? Herd effect All clients woken up when lock released, but only one can get lock How to do lock w/o herd effect? Define lock holder has creator of znode with lowest sequence number To acquire create ephemeral and sequential child of lock znode Call getChildren() Now you know if you have the lock Otherwise, know who is before you in line to get the lock, call it p Call exists(p, true) [in case deleted in meantime, and to watch for it] When p no longer exists, repeat previous step--why? p is ephemeral, might disappear from session timeout, not getting lock To release lock, delete your child znode How to do Read/write lock? Same as previous, but readers wait for previous writer, not any locker Encode read/write intent in znode name ("lock/read-SEQ" or "lock/write-SEQ") Double barrier? Use ready node trick, and create when you are nth to join Are zookeeper's API functions idempotent? Definitely not. E.g., sequential znode create bumps counter Why does section 4.1 say "transactions are idempotent"? API calls get translated into transactions Transactions sent through atomic broadcast, replicated on all servers What's the issue with calculating "future state"? Zookeeper is pipelines operations So may have multiple transactions where atomic broadcast not complete Can't apply state if transactions not committed But to make idempotent, need to know result (e.g., new counter value) So calculate state based on previous pending transactions, too What is the advantage of idempotent transactions? Allows write-ahead logging Also, makes fuzzy snapshot mechanism work What evaluation questions should we ask? Performance (reads/writes per second) Correctness/fault tolerance (not really evaluated) Table 1 (p. 10)--why more reads fewer writes with more servers? Reads can be handled by any server, so more servers = more reads/sec Writes go through atomic broadcast, requires work from all servers, plus communication overhead + tail latency