Spanner (2012) ============== Administrivia: Due Friday: email team+title to cs244b-staff subject "final project title" Learning goals - Putting things together (2PC, Paxos, linearizability, witnesses...) - The power of real-time clocks (or, when faced with a hard problem, change the assumptions) What is motivation? F1 is Google's bread and butter Need consistency and features of an RDBMS (strict serializability) Need geographic replication Need massive scalability Tens of terabytes causing serious problems with MySQL Required years of work to re-shard data! What's workload like? (Table 6) Lot's of read-only transactions! Should optimize for this What is spanner API? Looks a bit like SQL (Fig. 4, p. 255) Except one B-tree (index) per table Can interleave data from multiple tables Break key space up into directories to give locality hint to system Each directory can have its own replication/placement policy Supports linearizable read-write transactions Also supports linearizable lock-free read-only transactions Must predeclare a read-only transaction Can run at server-chosen recent time, or client specified time/range Because no locks, read-only transactions don't abort (unless old data has been garbage-collected) How are the servers organized? (Fig. 1) Servers are grouped into *zones*--the unit of administrative deployment Each zone physically isolated within a datacenter Each zone contains a hundred to thousands of *spanservers* Each spanserver stores 100-1000 *tablets* ( -> value map) Basically a B-tree with a write-ahead log But may encapsulate multiple directories (partitions of row space) (p. 254) Each tablet is managed by a Paxos state machine Allows tablets to be replicated to spanservers in different zones Zonemaster - assigns data to spanservers Universe master - debugging console Placement driver - decides when to move data on timescale of minutes Meets updated replication constraints or helps load balancing How is data organized? (Fig. 2-3) (k,t) -> val mappings stored in Tablets replicated in Paxos groups Keys partitioned *directories*--contiguous ranges w. common prefix Directories assigned (and moved between) Paxos groups Very large directories broken into fragments across multiple groups Seems necessary but not super common according to Table 5 What are the benefits of this organization? Scalability: most transactions should touch only a few Paxos groups Fault-tolerance: can survive a datacenter failure Read performance: often read-only tx doesn't leave local datacenter Placement flexibility: move directories to groups near clients How to add or remove servers to a Paxos group? Don't. Just "movedir" directory to a new tablet (sec 2.2, p. 254) Does that block access to the directory? Only briefly--moves data in background, then locks to move what changed Why does spanner log Paxos writes twice? (P. 253) Tablet has write-ahead (re-do) log, but Paxos also requires a log When co-designed, should use Paxos log as re-do log for tablet Paper says likely to remove this limitation eventually Note use of witnesses (parenthetical comment in Sec. 2.2 on p. 254) "North America, replicated 5 ways with 1 witness" Maybe combining Paxos with Harp-style witnesses Why might you want 1 witness instead of 2 with 5 replicas? Replicas can help scale read workloads, place data closer to clients Maybe witnesses absorb load in getting partitioned nodes back up to date? Beyond that, unclear if spanner witnesses differ from HARP In straight-up vanilla Paxos, both reads and writes go through same protocol Leader must wait another round trip to hear from quorum Why not handle read locally at the leader in vanilla (no data to replicate)? Later leader could have externalized writes, violating linearizability How do we fix vanilla Paxos to handle reads at leader? Nodes grant leader lease--promise not to ack other leaders for time T Given leases from quorum, leader knows no other leaders, can read locally Assumes bounded clock drift Let's imagine a simpler straw man #1 to understand spanner's design Assume the asynchronous system model (like VR, Raft, and Paxos) + leases Suppose each transaction stays entirely within a Paxos group Hence, no synchronization between Paxos groups, no fancy time stuff (Assumption will break down with need to fragment tablets) Use two-phase locking for concurrency control Acquire locks during a transaction, release on commit Ensures linearizability of transactions within Paxos group Paxos leader maintains lock table with shared and exclusive locks If leader fails or loses lease, can always abort the transaction What could go wrong in straw man #1? Within Paxos, everything will be totally ordered, but not externally E.g., A and B concurrently post comments to different Paxos groups C & D concurrently read both; C sees A's comment but not B's, D vice versa This violates *external consistency* Straw man #2: Implement cross-Paxos-group transactions with locking Transaction must only commit if locks in all Paxos groups intact So C, D at some point read-lock both A's and B's comments simultaneously Use two-phase commit across Paxos groups (like Viewstamped Replication) Just pick one of the Paxos groups to act as 2PC coordinator What about two-phase commit's lack of fault-tolerance? Okay because Paxos groups themselves are fault-tolerant What's wrong with straw man #2? That's a lot of locking, especially with many read-only transactions In above example, reads by C and D must lock A's and B's comments That's a lot of load on the leader (which must handle all reads) It might not make sense to cram everything into one transaction E.g., decision to load A's and B's comments might be made in browser Browser could fail or be slow--shouldn't hold even read locks How does spanner solve this? Totally ditches async. model with TrueTime What is TrueTime? API for retrieving estimate of current time Can't globally synchronize exact time, so returns a bounded interval: TT.now() -> TTinterval: [earliest, latest] Requires hardware (GPS, atomic clocks) Uses local daemon to coordinate Assumes bounded clock drift (200us/sec = 0.02%)--is this reasonable? Sec. 5.3 says clocks fail 6 times less often than CPUs, so maybe Idea: Use real time to order all transactions globally Either A's or B's comment will have later timestamp If you see effects of later transaction, guaranteed to see earlier one How does time eliminate the need for read locking? Assign each transaction a timestamp that preserves linearizability (I.e., if A committed before B started, then s_A < s_B) Within Paxos log, transaction timestamps must increase monotonically Tablets store history of values and allow reading at particular time Just read values at a read-only transaction's time to get linearizability This is often known as *snapshot isolation* Additional benefit: reads can now be spread across entire Paxos group So long as replica knows history through transaction's timestamp If replica fails? Try another one with same timestamp That's why read-only transactions can't fail (modulo garbage collection) How does a read-write transaction proceed? First, a client reads a bunch of data, acquiring read locks as needed When done, all writes are buffered at client, which holds only read locks Note you don't read your own writes during transaction Okay as reads return timestamp, which uncommitted writes don't have Writes and lock releases must be sent to one or more Paxos groups Then somehow pick a timestamp for the transaction, atomically commit Let's say whole transaction involves only one Paxos group--what happens? Client sends writes to group leader in a *commit request* Leader must pick a timestamp s--how does it proceed? s must be greater than any previously committed transaction in Paxos log Two-phase locking implies a period where all locks simultaneously held Logically want transaction timestamp to lie within this period i.e., want s greater than the time of any lock acquisition So, conservatively, ensure s > TTnow().latest at commit request receipt *Commit wait*: must wait until s < TTnow().earliest before replying--why? Suppose you write A, it completes, then you write B W/o commit wait, B could be on different server and get a lower timestamp Read Tx between two timestamps sees B, not A, violating external consistency What happens when a read-write transaction involves multiple groups? Client picks one group to be the two-phase commit coordinator Sends commit request to *coordinator leader* (Paxos leader of that group) Coordinator records TTnow().latest at the time it receives commit request Coordinator broadcasts vote request to other participant leaders On receipt of the vote request, the participant leaders must: Acquire any write locks that will be necessary for transaction Pick a *prepare timestamp* to send back with VOTE-COMMIT Prepare timestamp must lie within the term of the leader's lease Must also also be > all committed transactions--why? monotonicity Once all participant leaders send VOTE-COMMIT to coordinator leader Coordinator picks timestamp s such that: - s > TTnow().latest when commit request originally received - s >= Max prepare timestamp received from participant leader - s lies within lease terms of all participant leaders Again "commit wait" until s < TTnow().earliest before returning How to implement leader leases with TrueTime? Need all leases to be disjoint, so rely on TrueTime Could you make do with bounded clock drift assumption (no GPS, etc.)? Problem is, leaders must pick timestamps within their lease interval What happens for a read-only transaction? Client scope expression specifies all Paxos groups it will read Pick timestamp s for transaction Use snapshot isolation at s on reads--when can spanservers respond at s? Must have s <= t_safe, where t_safe=min(t_Paxos, t_TM) t_Paxos = latest stamp in Paxos log (since monotonicity) t_TM = infinity if no pending transactions otherwise lowest prepare time for transaction w. unknown outcome How to pick s for single-Paxos-group read-only transaction? Safe: use TTnow().latest at the time request received Better: if no committed transactions, use LastTS() (last committed xaction) How to pick s in multi-group, read-only transaction? Just TTnow().latest Could add a round querying all groups to find no pending + LastTS() Either way, have to wait for TTnow().earliest > s Schema changes No explicit locking, because humans ensure only one schema change at a time Hence, can pick timestamp far in the future for "flag point" Explain Figure 5--what causes shape of leader-hard? Remaining lease times distributed between 0 and 10 seconds How to implement secondary indexes? Transactions are all you need, as clients can atomically update table+index