(Section 18.6.3 of EN6)
Our first example of "big data" comes from data warehousing. The usual database storage layout is by row: each row, or record, is kept together on the disk. The file of records may be sorted or not, and records may be allowed to straddle two adjacent disk blocks or not, but the data itself is still organized by row.
In the data warehouse, it is common to maintain a large volume of data for the purpose of business analysis. Typically, there are few writes and zero transactions; the database can be tuned for reads operations. Andrew Pole's famous discovery at Target of how to predict which customers were pregnant by the end of their first trimester was done by analyzing warehouse data. The data warehouse may be huge, meaning that the I/O costs of accessing the data become the major bottleneck.
Warehouse data is often characterized by having a large number of columns, perhaps describing large numbers of different products. This means that rows are very long. In some cases, a single row can straddle multiple (traditional-sied) disk blocks. If we are interested in only a small number of columns, as is often the case, then the I/O overhead to access those columns by reading in entire rows can be prohibitive.
Another frequent feature of warehouse data is that many columns may be sparse; that is, contain a majority of null values.
A typical data-warehouse query involves a star schema: one very large fact table, with multiple independent joins to so-called dimension tables. The fact table, for example, might contain fields for customer_id, store_id and part_num; the satellite tables would be used to provide further information on these fields. A wide-row fact table might contain a large number (many hundreds!) of columns for part_nums from different departments.
In a column-store database, each column (or sometimes small(ish) groups of columns, called column families) is stored separately. Typically each column file is stored as a set of pairs ⟨primary_key,column_value⟩; the key may also simply be a row number that is not stored directly. To reassemble an entire row means looking up the key in a large number of column files, but the point of a column-store organization is that this is likely seldom done. If only a few columns are needed, access can speed up over that of a row-store organization by a factor of ten or more, almost entirely due to the reduced I/O. The column organization can also simply eliminate space for null entries in columns.
Columns can also be compressed efficiently, as all the data values tend to have similar "entropy". Rows, with many sharp transitions between string and binary data, are harder to compress.
The traditional drawback to column-oriented storage is that assembling a single record can be quite expensive. This is particularly true in distributed databases, where different columns may be on different cluster nodes (think MySQL Cluster here).
In a row-store database, there are things one can do to make it perform more like a column-store database. First, we can create indexes on the necessary columns; an appropriate index on column A (perhaps of the form ⟨primary_key,A⟩) allows retrieval of the column-A data without even touching the main file (that is, such an index is a covering index for column A). Accessing a set of columns with such indexes in a row-store database is theoretically just as efficient as accessing the same columns in a column-store database.
A database with lots of indexes is relatively expensive to update, but in the data warehouse, update costs matter much less.
Another optimization is to create a materialized view that contains just the columns needed, that is, to create a copy of the full database which just includes the desired columns. If those columns were A, B, C and Key, we could create the view as
create view ABC as select A, B, C, Key from bigtable;
This, however, adds its own costs, even in a read-only environment. Note that materialized views are a form of denormalization.
The 2008 paper Column-Stores v Row-Stores: How Different Are They Really?, by Abadi, Madden and Hachem (here) concludes that the multiple-index and materialized-view mechanisms are not all that effective, and that in many cases column-store databases greatly outperform row-store databases. However, as this paper acknowledges, there are many potential implementation differences between two different databases, aside from data organization into rows and columns. See Fig 5 on page 9.
A compromise between row-store and column-store mechanisms is Facebook's Record Columnar File strategy. The idea is first to partition the rows into modestly sized row groups; this is a form of fine-grained automatic sharding. The idea is to fit each row group onto a single block, though the blocksize may be large. The Facebook default blocksize using the Hadoop Distributed File System is 8 MB; blocks as large as 64 MB are not unusual.
At this point each row group is stored on a block, but organized physically by columns. That is, all of column 1 is listed, then all of column 2, etc. The blocksizes are large enough that the compression advantages of column-oriented storage are fully realized. Each column is compressed individually. Actual physical reads may only bring in (or only send across the network) the data from a single column. Lazy decompression is used, so that if a WHERE clause asks
where e.salary > 40000 and e.dno in (1, 2, 4, 5)
then if none of the e.salary values in a block satisfy the first condition, the e.dno column would never be decompressed.
If, on the other hand, entire records are needed, then all the fields of a record will be found in the same block.
RCFile is optimized for having very few record updates. New records can easily be appended, however.
Perhaps the most subtle issue for RCFile is that it is optimized for map-reduce operations, below.
Sharding refers to the process of dividing up a table into multiple sets of rows; one set is a "shard". Different shards can then be put on different hosts in a distributed database.
It helps if there is an easy way to tell which shard a given record will be found in. Shards are usually based on some attribute of the records, eg a hash of the key field, or the state or zipcode.
Sharding is a form of horizontal partitioning, but often several tables are sharded. For example, shard 1 might contain the Customer records for Illinois, and also the Invoice records and InvItem records for those customers.
Queries that require looking at multiple shards can be troublesome, but also can often be managed away.
Facebook for many years relied on sharded MySQL, and to some extent still does so. Some Facebook data is very amenable to sharding (eg individual user pages); other data is less so. See www.facebook.com/MySQLatFacebook (dynamic) and (for a more analytical approach) www.scalebase.com/mysql-sharding-blog-series-how-to-avoid-re-writing-applications/.
Sharding can be easier if you have some automated basis for doing it, so that the sharding is more-or-less transparent, invisible to the database user. The alternative, sometimes known as "DIY sharding" (DIY = Do It Yourself) usually requires rewriting all your queries to be sharding-aware. That said, queries can sometimes be cheaper and faster if they are sharding-aware.
MySQL Cluster does transparent sharding. For an example illustrating why
transparent sharding is sometimes the Right Thing, see MongoDB.
A 1 TB drive now costs about $40 (maybe less; I didn't try very hard, though brand-name drives may be more). At that price, 1 PB is $40K, and Facebook's 500 PB 1 EB data warehouse is $20 million $40 million.
Of course, you still need a building, and power supplies, and server racks and cooling.
A 1 TB solid-state drive is about $300; this makes a petabyte about $300,000. Facebook's 1 EB warehouse might be quite happy on spinning rust, though.
The real issue is that the price of storage is not the issue. What is the issue is how fast you (or FB) can find things in that 1 EB.
MongoDB is web scale!
In practice, a CA system would mean that it fails whenever a Partition event occurs. Since partitions occur all the time, whether we like it or not, CA is not an option. We want to make sure that Partition events are understood by the database. CP means consistent and Partition-Tolerant, but, when a partition does occur, the DB gives up Availability intentionally, as A is the missing letter. In a CA system, when a partition occurs, the system would be down, and so we again have no Availability (but maybe not as cleanly!) We will look at only the latter two: CP and AP.
Consistency (CP) is in one corner, and represents the Old Guard. This is mostly the SQL world, though there are a few NoSQL CP databases.
The alternative is AP: the database survives Partition events, and remains available, but the two pieces may no longer be consistent. AP databases make up the majority of the NoSQL world.
Consider DNS (the domain-name system) as a big database; it is DNS that
converts names like "pld.cs.luc.edu" to IP addresses. DNS is arguably the
most distributed database in the world. It is distributed both technically
and administratively. In DNS, we do not
have consistency. (In DNS, updates are relatively infrequent. Each update
comes with a cache time; non-authoritative DNS servers can serve up old
information as long as it is within its cache time; data older than that
must be re-acquired from the authoritative server.)
One solution is eventual consistency: when connectivity returns, we resolve conflicting transactions. DNS does have eventual consistency. The problem case, of course, is when two transactions involve updates to the same item. Note that for the Facebook wall example above, there aren't really any conflicting transactions (unless you count this: A posts, B comments on A's post, A deletes his/her post without seeing B's. But a related form of this happens all the time even when the data is consistent: A posts, B comments, A deletes without ever having refreshed the page).
The worst situation is when Amazon has one book left, and two people have just bought it. We can guarantee this won't happen with a central database (or even a separate central database for each of several disjoint sets of products, eg DB[N] is for all items where SKU mod 137 = N). But eventual consistency is not good here: it leads to scenarios where the last copy is sold twice.
Note, however, that while eventual consistency may not be "good" here, it may in fact be good enough. What should Amazon do if the last copy is sold twice? They can just tell one of the buyers that, too bad, the item they ordered turned out to be out of stock. This happens often enough anyway, due to discrepancies between the online inventory and the physical inventory.
In other words, eventual consistency may not be ideal, but it may be a workable arrangement.
Formally, eventual consistency means that if no new updates occur, then eventually all accesses will return the last-updated value, or the winner of the conflicting-update resolution. DNS does this.
There is another, sometimes unstated, goal for the Amazon inventory system: low latency. Users who experience lengthy delays when they click "buy" may become rapidly dissatisfied.
There is another issue here between CP and AP databases: in CP databases, availability suffers during momentary disconnections. AP databases usually introduce some kind of alternative model of consistency, and so "pure" consistency is violated "most of the time". What is gained in return is lower latency. This was addressed in a blog post by Daniel Abadi (and later in a paper), who (somewhat facetiously) proposed the acronym PACELC:
To me, CAP should really be PACELC ---
if there is a partition (P)
how does the system tradeoff between availability and consistency (A and C);
else (E) when the system is running as normal in the absence of partitions,
how does the system tradeoff between latency (L) and consistency (C)?
Note the else clause: if we're going to give up consistency in the face of a partition, it very well might make sense to continue to give it up in order to better handle network latency. In other words, the delays implicit in "eventual" consistency may be due to a partition and may also be due to ordinary network latency.
Many NoSQL databases relax consistency not so much because of the rare Partition events but to maintain low latency
A basic feature of a distributed database is data replication to multiple sites (where a site is a set of nodes at the same location, some geographical distance from other sites). If replication is not done, then in the event of a partition isolating site A, neither of sites B and C will have any access to A's data. Alternatively, suppose A replicates its data to B, B replicates its data to C and C replicates its data to A. Then, in the event of the isolation of any one site, the other two nodes have full access to all the records.
But replication is slow.
There are several strategies for replication. One is for each replica to be updated in real time, in lock-step with the master node. This offers the greatest consistency. Another approach is for sites to send replica updates "asynchronously", so the replica node will often be slightly behind. (There is also the approach of updating the replica during widely spaced "batch" updates, but that is only useful for non-real-time data.)
Note that, if A is involved in a transaction that refers to the data for which A is the master node, B as the replication site for A's data may also be involved in a transaction that refers to that same data. Consistency issues arise when B sells the last copy of a book, only to receive an update from A. This consistency can be reduced if B has to do all transactions on A's data through A (unless there is an actual partition), but this increases latency.
The goal is for the system to have an awareness of where the data is located. Operations on a given collection of data are scheduled, where possible, on the same LAN as the data, avoiding backbone traffic. Hadoop is generally "rack-aware", really meaning it knows when two nodes are on the same high-speed switch.
Smaller-scale Hadoop setups have one master node (CPU) and multiple "worker" nodes that are both DataNodes (repositories of data) and also TaskTracker nodes. Data is replicated on multiple nodes; the default degree of replication is 3.
Hadoop is designed to support a lot more reading than writing. Examples might include Google's search index, Facebook pages or Amazon inventory. In June 2012, Facebook had a 100 petabyte Hadoop installation, growing at 0.5 PB/day.
The Hadoop Filesystem is just that. Most ⟨key,value⟩ stores will be implemented as Hadoop tables. Note, however, that any filesystem is a ⟨key,value⟩ store where keys are filenames and values are file contents.
MapReduce is a large-data paradigm developed at Google and based on a common functional-language operation. The original overview paper by Dean and Ghemawat is mapreduce-2004.pdf. Not every Hadoop installation runs MapReduce, but this is the "native" data-analysis tool. The MapReduce engine has one master JobTracker node, which receives MapReduce tasks from clients and organizes them onto TaskTracker nodes. TaskTracker nodes generally run a small number (typically 4) of MapReduce tasks.
The idea behind MapReduce is that the programmer must supply three components:
The map() operation -- which is where most of the parallelization occurs -- usually takes a ⟨key,value⟩ pair and returns a list of related ⟨key,value⟩ pairs. A more specific description of the interface is
map (in_key, in_value) -> list(out_key,
The out_keys in a single list need not be the same.
A reduce() operation typically takes a single out_key, and a list of all (or some of) the intermediate_values associated with that out_key. The result is a list of output values.
Here is one of Google's standard examples of MapReduce. This example counts the occurrence of each word in a set of documents.
map(String doc_name, String doc_body): // doc_name (in_key): document name // doc_body (in_value): actual document contents for each word w in doc_body: EmitIntermediate(w, 1); reduce(String word, Iterator intermediate_values): // word: a word // output_values: a list of counts int result = 0; for each v in intermediate_values: result += v; Emit(AsString(result));
The map() operations generate long lists of words with a count of 1. The reduce() phase is applied to all the map() results with the same key (that is, all map() results for the same word). It is up to the shuffler to send all instances of the same word to the same reducer.
Notice also that the ⟨key,value⟩ input to map() is ⟨filename,file_contents⟩, quite different from the ⟨key,value⟩ pairs generated as output.
Here's an example in which we assume that the map() operations actually count all the individual words of each document:
doc1: here is a list of words
doc2: list of words with words in list and another list
doc3: here here here is words and words and another words
(here, 1) (is, 1) (a, 1) (list, 1) (of, 1) (words, 1)
(list, 3) (of, 1) (words, 2) (with, 1) (in, 1) (and, 1) (another, 1)
(here, 3) (is, 1) (words, 3) (and, 2)
(here, 1) (here, 3)
(is, 1) (is, 1)
(list, 1) (list, 3)
(of, 1) (of, 1)
(words, 1) (words, 2) (words, 3)
(and, 1) (and, 2)
(another, 1) (another, 1)
The reduce tasks then add up each word over each list, below. Note how the MapReduce infrastructure must take the results returned by the mappers and reorganize them by key (shuffles them) to feed into the reducers.
Search is another operation that lends itself to MapReduce formulation. The map()s just output records containing the value; the reduce() just consolidates the map output.
Reverse web-link graph: the map() searches source pages and outputs a ⟨target,source⟩ pair each time a link to target is found on page source. The reduce operation concatenates all ⟨target,source⟩ pairs with the same target into a pair ⟨target,list(source)⟩
A fair bit of machinery is needed to organize the map() outputs in a way that can be fed into the reduce() inputs; this is the shuffle stage. This shuffle, though, is usually a generic operation, and usually easily parallelizable. Typically this phase uses extensive hashing on intermediate_key values: each map() output pair ⟨ikey,ivalue⟩ is immediately put into bucket hash(ikey) by the system. Then, in the reduce() phase, ⟨ikey,ivalue⟩ pairs with the same intermediate_key in one bucket are fed to one reduce() task; of course, that bucket will contain all the ⟨ikey,ivalue⟩ pairs with ikey = intermediate_key. The system keeps track of the order of intermediate_key values in one bucket, for easier later sorting.The RCFile storage strategy above was fundamentally an attempt to optimize storage for MapReduce operations. Each block (think 8 MB) gets one mapper thread.
Hive (http://hive.apache.org/) is a database application built on top of Hadoop. Facebook was a major contributor; here is a Facebook blog on Hive, a paper on Hive, and a slideshow. The fundamental goal of Hive is to provide a familiar SQL-like user interface (known as Hive QL, or HQL) for making queries that will be implemented with MapReduce. Hive is a "batch" system, not meant for interactive queries; even small queries may take several minutes.
Facebook uses Hive for its data warehouse.
Hive supports ⟨key,value⟩ tables with indexes. It also supports data types (eg large-file data types such as text files, subtables, and binary files. Tables may contain partition columns (sometimes "virtual" columns) that provide tags for partitioning the table among multiple nodes; in the data-definition language this is indicated by a PARTITION BY clause. Tables within one node may be organized into buckets, eg by date or userid; these in turn are indicated by a CLUSTER BY clause. Here is an example table declaration from https://cwiki.apache.org/confluence/display/Hive/Tutorial#Tutorial-WhatisHive:
CREATE TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, friends ARRAY<BIGINT>, properties MAP<STRING, STRING> ip STRING COMMENT 'IP Address of the User') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING) CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' COLLECTION ITEMS TERMINATED BY '2' MAP KEYS TERMINATED BY '3' STORED AS SEQUENCEFILE;
Unlike relational systems, a primitive array type is supported; note the friends field above.
Hive's primary feature is that it supports an SQL-like query language, HQL. It is not as general as SQL, but HQL queries are very unlikely to run excessively slowly on huge data. HQL queries are translated by Hive into MapReduce queries. (According to Facebook, their Analytics users had trouble writing MapReduce queries themselves, which is not surprising.) In many ways, this feature of Hive -- that it is a front-end for map-reduce, allowing queries to be written in a more familiar fashion -- is its most important.
For each of these operations, creation of the appropriate mappers is relatively straightforward. For GROUP BY, the mappers output the GROUP BY attributes as the output key (the "w" in the example above). Joins can be implemented either in the map side or the reduce side; in either case, the first table listed should be the larger table (often much larger). For reduce-side joins, the join attribute is the output key from the mappers; the reducers then connect each value with the corresponding records in the second table. The overall effect is similar to a Hash Join. For a map-side join, the second table has to fit in memory; each mapper carries out the join on its set of first-table records and the shufflers and reducers simply consolidate everything.
Query output always goes into an intermediate table, rather than to the console. You can do this in SQL, too.
Facebook reports a fair number of events where one of the Hive systems
was effectively unavailable due to an inefficient or otherwise overly slow