Monday, October 31, 2011

Piccolo: Building Fast, Distributed Programs with Partitioned Tables and Spark

Piccolo is an in-memory programming model for large applications in data centers.  It allows programs to access mutable distributed in-memory state with a key-value data model.  Piccolo is different from many existing solutions based on MPI or data flow models providing no global state like MapReduce.  With Piccolo, developers just need to define kernel functions which will be distributed and access distributed data.  By exposing global shared state, it makes writing many types of applications easier.  The application can access state with simple get() and put() apis, and can define accumulator functions to handle conflicts.  The developer can define locality preferences so that kernels can operate locally on the data it mostly accesses, to reduce long network delays.  The master process coordinates all the kernels running on the workers, and to achieve load balancing, workers can "steal" un-started kernels from other workers.  To handle faults, Chandy-Lamport checkpoints are taken online, because restarting kernels will not work since state could have been changed.  Many different types of applications have been developed with Piccolo, such has distributed crawler, page rank, k-means clustering, and matrix multiplication.  In experiments, scaling was decent with more workers, and also for scaling the input size along with the workers on a local cluster.  On EC2, the scaling experiments were more linear.  Piccolo implementations of page rank and k-means clustering were compared to hadoop implementations and performed better, mostly because hadoop had to sort lots of data, serialize keys, and read/write to hdfs.  Piccolo reduces the effects of all of those issues.  Also, experiments with work stealing showed that performance can be improve greatly when there is an imbalance on load in the cluster.

Spark and Resilient distributed datasets (RDDs) is a distributed memory model for programmers to write in-memory applications on large clusters.  RDDs provide fault tolerance for the in-memory data by only allowing coarse grained transformations on the data.  RDDs are extremely useful for reusing data/computation and for iterative algorithms, since the data can still be kept in memory.  Existing systems have to provide fault tolerance with copying data or logging updates, but RDDs provide efficient fault tolerance by storing the lineage of transformations and be able to re-compute partitions.  Developers can control the partitioning of the RDDs and the persistence in memory.  The Spark programming interface for RDDs is similar to DryadLINQ and FlumeJava, by defining new RDDs with transformations.  Since the transformations are coarse grained, the RDDs are immutable and the lineage is remembered, RDDs behave very differently from distributed shared memory.  Representing RDDs require 5 pieces of information: partitions(), preferredLocations(), dependencies(), iterators(), and partitioner().  There are two classes or dependencies, narrow, where a partition of the parent RDD is used at most once by the child RDD, and wide, where a parent partition is used by many child partitions.  Narrow dependencies can be pipelined, and allows for more efficient recovery.  With experiments, Spark could perform iterative algorithms up to 20x faster than hadoop, and for analytics programs, could perform up to 40x faster.  Recovery was fast, because only the failed partitions of the RDDs has to be regenerated.  Also, a 1 TB dataset can be interactively queried within 5-7 seconds.

These two distributed in-memory systems solve the problem in different ways.  Piccolo focuses on mutable, fine-grained updates, where Spark handles coarse grained transformations.  There are tradeoffs between the systems, but clearly the future will still have needs of both types of models.  Certain applications will need asynchronous, fine-grained updates such as web applications, and certain applications will need batch processing on lots of data.  The fine-grained approach is most similar to traditional rdbmses, and the coarse-grained approach is most similar to olap systems.  In the future, the fine-grained approaches will have to incorporate more batch/performance techniques like in spark, and the coarse-grained approaches will try to incorporate more asynchronous and immediate updates and processing.

Relational Cloud: A Database-as-a-Service for the Cloud and Database Scalability, Elasticity, and Autonomy in the Cloud

Relational cloud is an effort to develop a database as a service in the cloud, in order to provide more operational tasks resulting in lower costs for the users.  There are several db as a service products already, such as Amazon RDS or Microsoft Azure, but Relational Cloud tries to handle efficient multi-tenancy, elastic scalability, and database privacy.  Efficiently running many database instances is advantageous because more databases can be consolidated into fewer machines, thus reducing costs.  The basic design uses existing unmodified RDBMSes as backend storage servers, and clients coordinate queries and communications among them.  Each tenant uses separate databases and tables, and they are partitioned and partitions can be migrated between servers.  CryptDB enabled client libraries allow for server-side cryptography for privacy.  The partitioning strategy is workload aware, because periodically, workload traces are analyzed to identify sets of tuples which are accessed together.  The Kairos component gathers performance statistics, such has CPU usage and RAM.  Workload working set is determined by slowly growing a probe table until the IO starts increasing.  A special model has been developed to determine how multiple databases and workloads may combine on a single server.  Also, another placement model has been developed to place partitions on servers in order to minimize the number of servers and balance load.  Adjustable security is achieved by identifying several layers of levels of encryption techniques, of decreasing levels of security.  More queries can be computed at the lower levels of encryption, and the system only decrypts the data minimally in order to execute the query.  With experiments of several tenants, Relational Cloud could achieve consolidation ratios of 6:1 to 17:1, and performed much better than database in VMs, because single servers could allocate resources more effectively, and could share the same log and buffer pool.

When running databases in the cloud, scalability, elasticity, and autonomy are important features.  RDBMSes are traditionally known to not have these features and be able to scale to the cloud.  However, the emergence of key-value stores have been popular in the cloud architecture, because if the highly independent access.  However, sometimes applications need more transactions and consistency and so several techniques can be used to achieve them.  You can start with key-value stores and add more transactional features or you can start from RDBMSes and add more key-value store features and scalability.  Data fusion is a technique for key-value stores which groups keys into entities, in order to provide transactions among them.  G-Store employs this technique, along with dynamic partitions of keys.  Data fission is a technique for DBMSes to partition databases into shards and provide full database semantics for each shard independently.  This allows for scalability, similar to key-value stores.  Elasticity is important to start up or power down servers according to utilization.  Live migration is an important technique to be able to achieve elasticity, and is important for shared-disk and shared-nothing systems.  Iterative copy is developed for shared-disk, and Zephyr is developed for shared-nothing systems.  For autonomy, machine learning algorithms are used to develop models of tenant behavior and placement, and to determine when to migrate, what to migrate and to where to migrate.

Database systems in the cloud will be very important since the cloud model is becoming very popular.  There are several public clouds which can support multi-tenancy and other cloud features.  However, I don't think this model will be very useful for large companies or data.  The multi-tenant databases on public clouds will be useful for small to medium databases, with the assumption that the workload or size will not change much.  If the data grows too large and the workload becomes too demanding, a dedicated system will be able to perform much better, and with better isolation, and multi-tenancy will not help.  In addition, companies may want to keep their own data, instead of in the public cloud, even if there is privacy and security built in.  Many of the multi-tenancy concepts should be useful in an internal setting, in order for companies to be able to keep their data, while allowing tenants to not worry about the details.  Maybe there will be products or open source solutions for reliable multi-tenancy on local clusters.

Saturday, October 22, 2011

BOOM Analytics: Exploring Data-Centric, Declarative Programming for the Cloud and Erlang - A survey of the language and its industrial applications

BOOM analytics is an effort to build large distributed systems in a data-centric, declarative way.  The higher level of abstraction may improve the code simplicity and speed up development, and debugging.  Building large distributed systems for the cloud can be very difficult, because it requires the developer to be careful about concurrent programming and fault tolerant communication.  BOOM attempts to make it easier with data-centric design and declarative programming.  The Overlog language is used to for the declarative language for cloud systems.  Overlog is based on datalog which is a deductive query language, where logical rules describe which tuples belong to which relations.  Overlog extends datalog with location of data notations, SQL features, and definition of of a model of generating changes to tables.  Each datalog timestep has three phases: inbound events are converted to tuple insertions and deletions, local rules are run until a fixed point is reached, then outbound events are emitted.  HDFS and hadoop scheduling were ported using BOOM, along with additional availability and scalability features, with far fewer lines of code, and better assurance in correctness.  With BOOM it was easy to implement first-come-first-serve and LATE scheduling policies for hadoop, and experimental results mirror the original LATE results.  Performance experiments show that the BOOM implementations were slightly lower than the original implementations.

Erlang is a functional programming language for soft real time systems.  Erlang also have requirements of large programs, control systems which cannot be stopped, portability, high levels of concurrency, IPCs, distributed systems, efficient garbage collection, incremental code loading, timeouts, and external interfaces.  Sequential programs are sets of recursive equations, and functions are primitive data types.  The spawn() primitive creates a new process for concurrent programming, and send() and receive() is used for message passing between processes.  Message sending is asynchronous, and the messages are delivered in order.  spawn() can be used to create a process on a different node as well.  Several commercial products use Erlang and some conclusions are that development time can be shorter, the performance is very good for real time systems, and the real-time garbage collection works well.

Both Erlang and Overlog provide declarative, functional programming models for distributed cloud systems.  Both papers conclude that it takes less time and less code to implement a system, and also inspires higher confidence of correctness.  However, I think most people do not think or program in the functional, logic rule based method.  Certain class of problems are useful to implement in a declarative way, such as well-defined state machines such as paxos.  In the future, there will be new languages or better language interoperability in order to insert segments of declarative, functional code, for certain tasks such as communication logic or other protocols.

PNUTS: Yahoo!'s Hosted Data Serving Platform and Don't Settle for Eventual: Scalable Causal Consistency for Wide-Area Storage with COPS

PNUTS is a scalable, geographically distributed data store for web applications.  Web services usually have scalability, response time, high availability, and relaxed consistency requirements, and PNUTS provides all of those features.  PNUTS provides features other systems usually don't, such has geographic distribution, and a timeline consistency model, instead of eventual consistency.  The timeline consistency is achieved per record with a master per record for all the replicas.  APIs exist to get the latest version of the record, or some older versions, depending on the application.  The data is partitioned into tablets either by range or by hash, and each tablet is replicated.  Replication across regions is asynchronous with the reliable message broker, which is like a pub/sub system.  It is functions as a durable WAL, and as the replication mechanism.  Also, the messages are delivered in order, and this attribute is used to achieve timeline consistency per record.  With experiments, the average latency for client requests were around 100 ms, the latency increased as there were more writes in the system, and increasing the number of storage servers almost linearly decreased the latency.

COPS is an ALPS (availability, low latency, partition-tolerance, high scalability) key-value store which provides the causal+ consistency property.  ALPS properties are desirable for internet services but COPS also provides stronger consistency guarantees, which makes it easier to develop for.  The causal property ensures that the system maintains the causal dependencies between operations, and the + property adds convergent conflict handling.  The convergent conflict handling ensures replicas deal with conflicts in the same way so that replicas don't diverge permanently.  The 3 potential causality situations are within an execution thread, gets from when the get() follows the put(), and transitivity.  COPS is the first system to provide causal+ property in a scalable way, and not with a single master.  COPS depends on a linearizable key-value store to run within a cluster and asynchronously replicates using a replication queue.  Additional operations are available to ensure the causal+ property: get_by_version(), put_after(), and dep_check().  There is also metadata per record to store dependencies, and/or older versions for COPS-GT.  Garbage collection is occasionally performed to clear out old versions and unnecessary dependencies.  In the system, only 5 seconds of metadata needs to be stored, since transactions are only allowed to run for 5 seconds.  In experiments, the throughput increased as the put:get ratio decreased, because the number of dependencies decreased.  Also, COPS-GT usually has lower throughput than COPS because of the stronger guarantees, but as the value size increases, the inter-operation time increases and COPS-GT can achieve similar performance.  Scalability for COPS is also much better than single master log shipping systems.

Both of these systems provide stronger consistency across geographically distributed systems.  The stronger consistency is useful, but future systems will have to better deal with fault tolerance and data loss.  Both of these systems make use of a message bus to deliver asynchronous replication messages, but if a datacenter fails, all those messages will be lost, and data will be lost.  Datacenters can fail, like the Amazon outage showed, and also, modular datacenters are becoming more popular, so less reliability is built into them.  Future systems will have to try to prevent data loss in the face of datacenter failures.

Wednesday, October 12, 2011

DryadLINQ: A System for General-Purpose Distributed Data-Parallel Computing Using a High-Level Language and FlumeJava: Easy, Efficient Data-Parallel Pipelines

DryadLINQ is a new programming model for large scale distributed programming, providing imperative and declarative operations on datasets within a traditional high-level language.   Programs are written sequentially as transformations on datasets in the .NET framework, and the runtime system runs the program in parallel on the Dryad platform.  DryadLINQ makes programming large scale distributed programs very simple, since the programmer can just write iterative programs and the system compiles it to reliability run on many machines.  By combining imperative and declarative programming, DryadLINQ provides more flexibility than SQL and optimization opportunities over imperative languages.  DryadLINQ exposes datasets as enumerable objects, uses lazy evaluation of the datasets, and provides strongly, statically typed constructs.  DryadLINQ adds new operators Apply() and Fork() to LINQ to add more parallelizable operators to the language.  Programs are translated to execution plan graph which is a DAG, and static optimizations and runtime dynamic optimizations are performed.  Static optimizations are pipelining, removing redundancy, eager aggregation, and IO reduction.  During runtime, the system can dynamically re-partition the data to optimize sorting.  DryadLINQ can leverage the advantages of PLINQ which parallelizes programs on multicore machines, and can also leverage the LINQ-to-SQL system to interoperate with SQL databases.  With experiments, terasort benchmark ran with almost constant scaleup properties.  The overhead of DryadLINQ over custom Dryad programs is about 10%.

FlumeJava is a similar system developed by Google.  Writing large scale parallel programs for massive amounts of data can be very hard, and FlumeJava solve that problem.  Many problems need pipelines of map-reduce jobs, so FlumeJava exports the idea of immutable parallel collections to ease the programming and allow automatic compilation and optimizations to map-reduce jobs.  The two central classes are Collection and Table, and main operators are parallelDo(), groupByKey(), combineValues(), and flatten().    Other more complex operations, like count(), join() and top() can be implemented with the previous basic operators.  The sequence of operators are not executed immediately but exhibit deferred evaluation, so the resulting program forms a DAG of operators.  Optimizations on the graph include, parallelDo fusion, map-shuffle-combine-reduce fusion, sinking flattens, and lifting combineValues.  At runtime, the system decides to run the MSCR operator locally or run it remotely and in parallel, with the associated startup latency.  Within Google, FlumeJava is becoming the primary java-based API for data parallel computations.  The optimizations usually reduce the stages by about 5x, and some can be reduced by 30x.  FlumeJava was compared with map reduce and sawzall, and for 4 applications, FlumeJava code was more concise.  After optimizations, FlumeJava could reduce the number of map-reduce stages down to the hand-optimized map reduce code.  In terms ok the running time, FlumeJava is very close to optimal runtime of the hand-optimized map reduce code.

The future language data intensive computation will look similar to DryadLINQ and FlumeJava and Pig, with the combination of declarative and imperative programming.  Declarative nature of SQL is very well defined, but at times it can be too restrictive, and difficult to express complicated algorithms.  Since programmers are already used to writing programs iteratively, so it makes sense to allow developers to write large scale data computations iteratively as well.  The functional aspects allow the semantics to be cleaner with the immutable datasets concepts.  This will allow more developers to create larger and more involved computations easily, which will be a huge advantage.  Optimizations will improve and include more cost based features from traditional databases.  Also, compiling down to map-reduce does not always make sense.  Dryad supports any DAG of dataflow, but there will probably be other lower level primitives similar to the map-reduce paradigm, which higher level programs can utilize.

Monday, October 10, 2011

Dynamo: Amazon's Highly Available Key-Value Store

Dynamo was developed in Amazon to provide a highly available, scalable distributed data store for applications that need high reliability.  Usually, traditional dbmses are used as data stores, but Amazon determined that most applications only need simple key value access.  Dbmses have a lot of extra querying capabilities and also favor consistency over availability and scalability.  Dynamo provides an "always on" experience because the shortest unavailability could still have significant financial impact.  It is always writable, with conflict resolution during reads, and not writes.  At the base, Dynamo uses consistent hashing with virtual nodes.  Data is replicated to the first hashed node, and then the N-1 next available successors.  Each update creates a new version of the data for vector clocks, and for conflict resolution on the read.  The application developers can configure an instance with varying number of N, read quorum and write quorum, in order to achieve different levels of consistency and availability.  Hinted handoff protocol is used for nodes receive updates for data it is not responsible for, by trying to send the updates back to the original nodes.  Experiences show that the 99.9 percentile is around 200 ms, but there is a lot of variability.  Buffering helps reduce the variability, at the cost of durability.  Experiments also show that inconsistencies do not happen very frequently.

The future will have to rely more on distributing data across many different datacenters.  Datacenters are becoming more prevalent, and will be the "computers" of the future.  How will the Dynamo model map to many geo-diverse datacenters?  It will still be important for low, predictable latencies, but they will be more difficult to achieve when replicas could be all over the world.  Highly scalable and distributed data stores will have to be able to run across many datacenters.  Most likely, they will still be asynchronously replicated to remote sites to preserve latencies, but new techniques will be developed to provide consistency for applications which need it.  Also, Dynamo is only a key value store.  Future datastores will need to provide more rich APIs in order to support other use cases, such as analytics.

Dremel: Interactive Analysis of Web-Scale Datasets

Dremel is a system from Google which is a highly scalable, interactive ad-hoc analytic system of read-only nested data.  It uses execution trees and columnar storage data, and can run aggregations over trillion row tables in only a few seconds.  It can scale up to thousands of machines and supports petabytes of data.  Interactive response times are important for data exploration and debugging.  Too much data is too difficult to analyze interactively with map reduce, so Dremel tries to achieve low response times.  Dremel's sweet spot is for aggregating over large scale data, and is not a replacement for map reduce.  Using a serving tree, partial aggregation can be performed in the intermediate results as the data is streamed from the leaf nodes up to the root data.  The data is stored in a column wise fashion, which can provide good compression and also advantageous when projections only need a small subset of the columns.  Dremel works on the data in situ, so that there is no loading time, and other frameworks can access the data.  Experiments show that with columnar storage, only accessing a few columns can have orders of magnitude speedup.  Map reduce can also benefit from the columnar storage.  With large aggregations, having more levels in the serving tree can result in speedups since there is more partial aggregation of intermediate results.  Stragglers can be a problem for interactivity, but the system can be configured so that the system returns an answer with less than 100% of the data read.

Dremel is a successful system because Google can afford to have thousands of machines allocated for the system.  High scalability and low latency are achieved by using more machines to process the data.  Most other companies will not be able to afford so many machines for their ad hoc processing system.  The future will have to more efficiently use fewer machines but still provide the interactivity.  This may mean the hardware will have to use SSDs or use different compression or scanning techniques.  Also, future systems will have to take care of other use cases, such has streaming data and modifiable data.

Tuesday, October 4, 2011

Bigtable: A Distributed Storage System for Structured Data

Bigtable is a flexible, high performance, scalable distributed storage system for structured data built by Google.  Bigtable has a richer data model than a key-value store, but does not provide the full relational model.  Bigtable can scale up to petabytes of data and thousands of machines.  It can support a wide range of applications and configurations, and clients can control many aspects such as data layout, data format, and data locality.  Bigtable is essential a distributed, sorted map, which maps keys (row string, column string, timestamp) to string data.  The keys are lexicographically ordered, and columns are grouped into column families.  The architecture has a master which coordinates all the tablets, and tablet servers which handle tablets.  Clients usually communicate directly with the tablet servers.  SStables stored in GFS are used for the durable data, which are ordered, immutable maps.  Writes go to a redo log in GFS, and the recent writes are stored in a memtable.  When the memtable fills up, it is flushed and written to a new SStable, and occasionally, a major compaction merges several SStables into one.  Reads are served from the memtable and the merged SStables.  Locality groups are groups of column families which correspond to an SStable.  This can provide vertical partitioning in order to group related columns together.  Two pass compression scheme is used to achieve 10:1 compression ratios, better than gzip.  Experiments show that Bigtable scales up well, but not linearly, because of the imbalance in the load and the saturation of the network from GFS to the tablet servers.

Bigtable has inspired many other distributed data stores recently.  The apis of these systems are usually limited to simple key-value type of access.  However, not all applications fit the model of single row access and transactions.  I think the future of these systems will be to add more database concepts to the systems without significantly harming the scalability and availability.  Some of these features include transactions, higher level languages, and different levels of consistency.  Bigtable type of systems will look more and more like distributed parallel databases.

Monday, October 3, 2011

Scads: Scale-independent storage for social computing applications

Internet services need to be able to adapt to the load they receive from users.  This load could vary wildly, as services gain popularity, or times of day show natural low levels of load.  In addition to adapting to the query load, the services should also handle growing data, in order to still provide good response times for users.  SCADS tries to solve these issues by with data scale independence, effective scaling up and down, and using machine learning to predict performance and resource requirements.  data scale independence is a useful property for services because it allows the data to grow without changing the application.  SCADS also provides a performance safe query language to statically analyze queries and prohibit queries which may not have constant amount of work per user.  This means all queries must have lookups and not depend on the size of the data.  This can be achieved by creating indexes.

SCADS tries to provide additional features to large scale data storage systems.  Key additions are the performance safe query language and the usage of machine learning to automatically scale up and scale down the cluster.  In the future, automatic scaling will be very important for larger systems.  Maintenance costs can grow if it becomes a manual process, so effective estimation of resource consumption and automatic scaling will be crucial.  Data scale independent queries are a useful feature, but at times, it may be too restrictive, especially if larger analysis is required.  SCADS was built for interactive queries and not large analysis, but future systems will bridge the gap between OLTP and OLAP workloads.


HIVE: Data Warehousing & Analytics on Hadoop -and- Pig Latin: A Not-So-Foreign Language for Data Processing

Lots of data is being generated and stored by companies today, and there is great interest in gaining insight from analyzing all that data.  Facebook generates over 2+ TB day, and storage and analysis is too difficult for traditional DBMSes.  The map reduce framework has better scalability and availability than DBMSes and these properties are usually more important than the ACID properties of DBMSes.  However, writing map reduce jobs is difficult.  Hive is a system built by Facebook which makes querying and managing structured data more easy using hadoop.  It supports many different data formats, structured data, common SQL features, and metadata management.  Many different application types make use of Hive, such as summarization, ad hoc analysis, data mining, spam detection and more.  The data is logically partitioned, then hash partitioned to provide high scalability.  All the metadata is stored in a SQL backend which holds the schemas, the serialization and deserialization libraries, locations, partition info, and more.  The SQL-like interface also allows for custom map and reduce scripts to be able to run map reduce style jobs.

Pig Latin is similar to Hive in the sense that it also tries to solve the difficulty of writing map reduce jobs.  Map reduce is considered too low level, and restricted.  Pig Latin tries to find the sweet spot between declarative and procedural querying of data.  Pig Latin provides ad hoc read-only querying for large scale data.  Parallel databases are usually too expensive and SQL is sometimes unnatural.  Map reduce data flow is sometimes too restrictive, and the opaque nature of the map and reduce functions limit optimizations.  Pig Latin works by specifying transformations on the data.  The programmer defines the sequence of transformations procedurally, but the operators are high level concepts like filtering, grouping, and aggregation.  There is more opportunity for optimizations than map reduce.  Pig Latin uses a nested data model which is closer to what programmers think in, and lots of use cases already store data in nested way.  Java UDFs are treated as first class citizens in the system, and only primitives which can be parallelized are allowed.  This allows for good performance over large amounts of data.  Some primitives are filter, cogroup, join, foreach, flatten, and store.  The system uses lazy evaluation so execution is only done when it is needed to view or store the data.  Optimizations can be performed after the logical plan is constructed at execution time.  The execution compiles down to hadoop map reduce jobs, where the cogroups becoming the boundary of the map reduces.  Pig pen is a debugging framework which uses a generated sandbox data set to allow the developer to see example transformations to debug the programs.

Hive and Pig Latin are only two of the programming frameworks for large scale distributed programs.  The common theme is that map reduce is too low level, so higher level languages need to be developed.  Unlike how SQL became a standard for relational data querying, I don't think there will be a standard for large scale data processing.  There are more and more people dealing with the data and they have varying levels of technical skills and abilities.  Some people are comfortable at very high level languages like SQL, which is sometimes limited to a certain type of data set and analysis.  Others may prefer lower levels such as map reduce.  Hive and Pig Latin borrow from declarative query languages to allow a higher level for map reduce.  They also provide a way to define multiple map reduce phase for a particular task.  Higher level languages are definitely necessary, but right now it is not clear which model is "better".  Also, they both compile down to map reduce jobs, but that isn't necessarily the answer.  Maybe if the higher level languages used a more general underlying framework, they could provide more optimizations, since there are more lower level primitives.  Forcing tasks to follow the map reduce model may limit performance for some use cases.