(Section 18.6.3 of EN6)
Our first example of "big data" comes from data warehousing. The usual database storage layout is by row: each row, or record, is kept together on the disk. The file of records may be sorted or not, and records may be allowed to straddle two adjacent disk blocks or not, but the data itself is still organized by row.
In the data warehouse, it is common to maintain a large volume of data for the purpose of business analysis. Typically, there are few writes and zero transactions; the database can be tuned for reads operations. Andrew Pole's famous discovery at Target of how to predict which customers were pregnant by the end of their first trimester was done by analyzing warehouse data. Another is Wal*Mart's discovery one Black Friday that a certain computer was selling well at one store; Wal*Mart was able to get the same computer featured and put on sale at other stores so quickly that per-store sales of the computer at other stores caught up to the first store by the end of the day. The data warehouse is typically huge, meaning that the I/O costs of accessing the data becomes the major bottleneck.
Warehouse data is often (though not always) characterized by having a large number of columns, perhaps describing large numbers of different products. This means that rows are very long. In some cases, a single row can straddle multiple (traditional-sized) disk blocks. If we are interested in only a small number of columns, as is often the case, then the I/O overhead to access those columns by reading in entire rows can be prohibitive.
Another frequent feature of warehouse data is that many columns may be sparse; that is, contain a majority of null values.
A typical data-warehouse query involves a star schema: one very large fact table, with multiple independent joins to so-called dimension tables. The fact table, for example, might contain fields for customer_id, store_id and part_num; the satellite tables would be used to provide further information on these fields. A wide-row fact table might contain a large number (many hundreds!) of columns for part_nums from different departments.
In a column-store database, each column (or sometimes small(ish) groups of columns, called column families) is stored separately. Typically each column file is stored as a set of pairs ⟨primary_key,column_value⟩; the key may also simply be a row number that is not stored directly. To reassemble an entire row means looking up the key in a large number of column files, but the point of a column-store organization is that this is likely seldom done. If only a few columns are needed, access can speed up over that of a row-store organization by a factor of ten or more, almost entirely due to the reduced I/O. The column organization can also simply eliminate space for null entries in columns.
Columns can also be compressed efficiently, as all the data values tend to have similar "entropy". Rows, with many sharp transitions between string and binary data, are harder to compress.
The traditional drawback to column-oriented storage is that assembling a single record can be quite expensive. This is particularly true in distributed databases, where different columns may be on different cluster nodes (think MySQL Cluster here).
In a row-store database, there are things one can do to make it perform more like a column-store database. First, we can create indexes on the necessary columns; an appropriate index on column A (perhaps of the form ⟨primary_key,A⟩) allows retrieval of the column-A data without even touching the main file (that is, such an index is a covering index for column A). Accessing a set of columns with such indexes in a row-store database is theoretically just as efficient as accessing the same columns in a column-store database.
A database with lots of indexes is relatively expensive to update, but in the data warehouse, update costs matter much less.
Another optimization is to create a materialized view that contains just the columns needed, that is, to create a copy of the full database which just includes the desired columns. If those columns were A, B, C and Key, we could create the view as
create view ABC as select A, B, C, Key from bigtable;
This, however, adds its own costs, even in a read-only environment. Note that materialized views are a form of denormalization.
The 2008 paper Column-Stores v Row-Stores: How Different Are They Really?, by Abadi, Madden and Hachem (here) concludes that the multiple-index and materialized-view mechanisms are not all that effective, and that in many cases column-store databases greatly outperform row-store databases. However, as this paper acknowledges, there are many potential implementation differences between two different databases, aside from data organization into rows and columns. See Fig 5 on page 9.
See also BigTable, below.
A compromise between row-store and column-store mechanisms is Facebook's Record Columnar File strategy. The idea is first to partition the rows into modestly sized row groups; this is a form of fine-grained automatic sharding. The idea is to fit each row group onto a single block, though the blocksize may be large. The Facebook default blocksize using the Hadoop Distributed File System is 8 MB; blocks as large as 64 MB are not unusual.
At this point each row group is stored on a block, but organized physically by columns. That is, all of column 1 is listed, then all of column 2, etc. The blocksizes are large enough that the compression advantages of column-oriented storage are fully realized. Each column is compressed individually. Actual physical reads may only bring in (or only send across the network) the data from a single column. Lazy decompression is used, so that if a WHERE clause asks
where e.salary > 40000 and e.dno in (1, 2, 4, 5)
then if none of the e.salary values in a block satisfy the first condition, the e.dno column would never be decompressed.
If, on the other hand, entire records are needed, then all the fields of a record will be found in the same block.
RCFile is optimized for use cases involving very few record updates. New records can easily be appended, however.
Perhaps the most subtle issue for RCFile is that it is optimized for map-reduce operations, below.
Sharding refers to the process of dividing up a table into multiple sets of rows; one set is a "shard". Different shards can then be put on different hosts in a distributed database.
There is a cute story about the origin of this term; see mjtsai.com/blog/2020/06/05/the-origin-of-database-sharding. Supposedly it comes from the developers of the game Ultima Online (and the need for database sharding to handle all the users):
The evil wizard Mondain had attempted to gain control over Sosaria by trapping its essence in a crystal. When the Stranger at the end of Ultima I defeated Mondain and shattered the crystal, the crystal shards each held a refracted copy of Sosaria.
However, in the Ultima Online setting the "shards" appear to be separate game worlds, rather than one unified world spread over multiple databases.
When sharding, it helps if there is an easy way to tell which shard a given record will be found in. Shards are usually based on some attribute of the records, eg a hash of the key field, or the state or zipcode.
Sharding is a form of horizontal partitioning, but often several tables are sharded. For example, shard 1 might contain the Customer records for Illinois, and also the Invoice records and InvItem records for those customers.
Queries that require looking at multiple shards can be troublesome, but also can often be managed away.
Facebook for many years relied on sharded MySQL, and to some extent still does so. Some Facebook data is very amenable to sharding (eg individual user pages); other data is less so. See www.facebook.com/MySQLatFacebook (dynamic) and (for a more analytical approach) www.scalebase.com/mysql-sharding-blog-series-how-to-avoid-re-writing-applications/.
Sharding can be easier if you have some automated basis for doing it, so that the sharding is more-or-less transparent, invisible to the database user. The alternative, sometimes known as "DIY sharding" (DIY = Do It Yourself) usually requires rewriting all your queries to be sharding-aware. That said, queries can sometimes be cheaper and faster if they are sharding-aware.
MySQL Cluster does transparent sharding. For an example illustrating why transparent sharding is sometimes the Right Thing, see MongoDB.
Sharding works best when most common queries can be done on one shard. Cross-shard queries are slow, and can easily be catastrophically slow. One example might be to put each Facebook user's timeline on one shard. That works best when combined with some denormalization, duplicating timeline content as necessary (see normalization.html#denormalization). For posts that get assigned to two (or more) user timelines, making a copy for each applicable shard allows the sharding to be efficient.
Now consider Uber's database, consisting of driver records and rider records. When driver Alice and passenger Bob are both in Chicago, it makes sense to shard by city code. But what if Bob travels to Boston and gets a ride with Charlie? In this case, it might make sense to insert a "visiting passenger" record for Bob in the Boston shard, so that visitingBob and Charlie can be updated together. After Bob has not been visiting Boston for a while, the visitingBob record can be deleted, or folded into Bob's Chicago records. Recall that Uber has location data on everyone, and also that Uber updates its location records very frequently (maybe more often than once a minute), but new ride-request records typically occur once every few hours.
As a final example, consider quantity_in_stock updates at Amazon. It
would make sense to divide the part numbers among multiple databases,
because we have seen that quantity_in_stock updates can be expensive. The
database should be readily computable from the part number, to avoid doing
a lookup. A transaction will have to be highly shard-aware.
A 2 TB spinning-iron drive now costs about $50 (maybe less; I didn't try very hard, though brand-name drives may be more). At that price, 1 PB is $25K, and Facebook's 500 PB 1 EB data warehouse is $12.5 million $25 million.
Of course, you still need a building, and power supplies, and server racks and cooling.
A 1 TB solid-state drive is about $300; this makes a petabyte about $300,000. Facebook's 1 EB warehouse might be quite happy on spinning rust, though.
The real issue is that the price of storage is not the issue. What is the issue is how fast you (or FB) can find things in that 1 EB.
There is currently a movement that suggests that the days of relational databases are waning, and that emerging needs to manage very large databases means that new approaches must be found. Most often, the idea is that SQL mandates support for too many inefficient operations: searches on non-indexed fields, joins involving those fields, multiple keys, etc.
A typical NoSQL database supports tables that allow lookup via one primary key column, but cannot necessarily be joined to other tables, or searched by other columns. Such a structure -- sometimes called a ⟨key,value⟩ store -- looks something like a giant version of a Java HashMap, though often with multiple data columns.
Not everyone is in agreement with the NoSQL manifesto: see I Can't Wait for NoSQL to Die, by Ted Dziuba (archived here). Also note that, more and more, NoSQListas claim that their acronym stands for "Not only SQL", as if to recognize that SQL and Relationalism will always have a role to play. This is a big change: from NoSQL as less than SQL -- for efficiency -- to NoSQL as more than SQL.The examples in the post all use a format more in keeping with the original map/reduce style. The counter-argument is that any underlying mechanism (here map/reduce) can be generated with an SQL-style input language by appropriate processing, and that we should stick with the input language everyone is familiar with.
The EdgeDB team takes the view that SQL is simply a dated language, with too many obsolete features and too many odd restrictions. They have a point; see their blog post at edgedb.com/blog/we-can-do-better-than-sql. But the changes they introduce in their own query language, EdgeQL, might also be introduced into SQL:
EdgeDB doesn't exactly help their case by complaining that NULLs introduce problems into SQL. The difficulty with NULLs is fundamental, and not due to the language. EdgeDB makes everything a set, and so NULLs are the empty set, but this still leaves plenty of nonintuitive cases.
Finally, all RDBMSs make performance tradeoffs. MySQL might be faster at one thing, Postgres at another. There are a variety of "NoSQL" databases that are basically RDBMSs that have been optimized for one particular relational-database function. If that function doesn't need full support for SQL, then a NoSQL database might be an appropriate choice.
The first step away from the traditional single-server DB model is to partition the data among multiple servers. Such a cluster can be all at one site, or can be distributed across multiple sites. For the moment, let's just consider the former.
If the cluster only needs to support reads (for example, serving website html and other files), then clustering is easy. It makes sense as a form of load balancing.
Clusters with writes are more complicated. One approach is to allow only the master server to write data; the other, "slave" servers accept read requests only. This is not as broken as it sounds; at many sites, writes are straightforward updates and new-record insertions; complex queries are usually read-only and these are exactly the ones that take time. The usual implementation is known as log shipping. For recovery purposes, Postgres writes every database change to the write-ahead log, or WAL. If one or more clone databases are created, they can be kept up-to-date by processing the WAL data. The data can be sent in blocks (more efficient, but more subject to stale reads), or via streaming. See postgresql.org/docs/9.5/static/warm-standby.html.
With a little alchemy, it is supposedly possible to ensure that all the slave servers return consistent data; that is, if the server commits a write, the clients will show it. See brandur.org/postgres-reads for details.
Multi-master clusters are possible, but sometimes depend on the kinds of transactions being executed. Sometimes different servers are master for some tables and slave for others. If the same table is replicated at two servers, and there are parallel updates, then these have to be resolved somehow.
There remains no general-purpose, out-of-the-box strategy for configuring
a database spread over multiple servers, with each server taking a portion
of the load. A wide variety of solutions have been proposed, but each
applies only in a specific case. Configuring shared geographically
separated databases (distributed databases) is even harder.
In practice, a CA system would mean that it fails whenever a Partition event occurs. Since partitions occur all the time, whether we like it or not, CA is not an option. We want to make sure that Partition events are understood by the database. CP means consistent and Partition-Tolerant, but, when a partition does occur, the DB gives up Availability intentionally, as A is the missing letter. In a CA system, when a partition occurs, the system would be down, and so we again have no Availability (but maybe not as cleanly!) We will look at only the latter two: CP and AP.
Consistency (CP) is in one corner, and represents the Old Guard. This is mostly the SQL world, though there are a few NoSQL CP databases.
The alternative is AP: the database survives Partition events, and remains available, but the two pieces may no longer be consistent. AP databases make up the majority of the NoSQL world.
Consider DNS (the domain-name system) as a big database; it is DNS that
converts names like "pld.cs.luc.edu" to IP addresses. DNS is arguably the
most distributed database in the world. It is distributed both technically
and administratively. In DNS, we do not
have consistency. (In DNS, updates are relatively infrequent. Each update
comes with a cache time; non-authoritative DNS servers can serve up old
information as long as it is within its cache time; data older than that
must be re-acquired from the authoritative server.)
One solution is eventual consistency: when connectivity returns, we resolve conflicting transactions. DNS does have eventual consistency. Eventual consistency works well when each separate database is responsible for (is master of) a designated portion of the data. In this case there can never be conflicts.
The problem case for eventual consistency is when two transactions
running on different servers involve updates to the same item. Note that
for the Facebook wall example above, there aren't really any conflicting
transactions (unless you count this: A posts, B comments on A's post, A
deletes his/her post without seeing B's. But a related form of this
happens all the time even when the data is consistent: A posts,
B comments, A deletes without ever having refreshed the page).
The worst situation is when Amazon has one book left, and two people have
just bought it. We can guarantee this won't happen with a central database
(or even a separate central database for each of several disjoint sets
of products, eg DB[N] is for all items where SKU mod 137 = N).
But eventual consistency is not good here: it leads to scenarios where the
last copy is sold twice.
Note, however, that while eventual consistency may not be "good" here, it
may in fact be good enough. What
should Amazon do if the last copy is sold twice? They can just tell one of
the buyers that, too bad, the item they ordered turned out to be out of
stock. This happens often enough anyway, due to discrepancies between the
online inventory and the physical inventory.
In other words, eventual consistency may not be ideal, but it may be a
workable arrangement.
Formally, eventual consistency means that if no new
updates occur, then eventually all accesses will return the last-updated
value, or the winner of the conflicting-update resolution. DNS does this.
There is another, sometimes unstated, goal for the Amazon inventory system: low latency. Users who experience lengthy delays when they click "buy" may become rapidly dissatisfied. Amazon is probably a lot more worried about lost sales due to excessive delays than having to send the occasional "we know we confirmed your purchase but it turns out it's out of stock after all" email.
There is another issue here between CP and AP databases: in CP databases, availability suffers during momentary disconnections. AP databases usually introduce some kind of alternative model of consistency, and so "pure" consistency is violated "most of the time". What is gained in return is lower latency. This was addressed in a blog post by Daniel Abadi (and later in a paper), who (somewhat facetiously) proposed the acronym PACELC:
To me, CAP should really be PACELC ---
if there is a partition (P)
how does the system tradeoff between availability and
consistency (A and C);
else (E) when the system is running as normal in the absence of
partitions,
how does the system tradeoff between latency (L) and
consistency (C)?
Note the else clause: if we're going to give up consistency in the face of a partition, it very well might make sense to continue to give it up in order to better handle network latency. In other words, the delays implicit in "eventual" consistency may be due to a partition and may also be due to ordinary network latency.
Many NoSQL databases relax consistency not so much because of the rare Partition events but to maintain low latency
A basic feature of a distributed database is data replication to multiple sites (where a site is a set of nodes at the same location, some geographical distance from other sites). If replication is not done, then in the event of a partition isolating site A, neither of sites B and C will have any access to A's data. Alternatively, suppose A replicates its data to B, B replicates its data to C and C replicates its data to A. Then, in the event of the isolation of any one site, the other two nodes have full access to all the records.
But replication is slow.
There are several strategies for replication. One is for each replica to be updated in real time, in lock-step with the master node. This offers the greatest consistency. Another approach is for sites to send replica updates "asynchronously", so the replica node will often be slightly behind. (There is also the approach of updating the replica during widely spaced "batch" updates, but that is only useful for non-real-time data.)
Note that, if A is involved in a transaction that refers to the data for which A is the master node, B as the replication site for A's data may also be involved in a transaction that refers to that same data. Consistency issues arise when B sells the last copy of a book, only to receive an update from A. This consistency can be reduced if B has to do all transactions on A's data through A (unless there is an actual partition), but this increases latency.
For as long as RDBMSs have been around, DB managers have engaged in occasional "denormalization" to avoid a particular join for performance reasons. One example might be a TOTAL field in the INVOICE table: the total for a specific invoice_num is calculated by finding all items ordered, as listed in INVITEMS:
select sum(quantity*price) from INVITEM ii where ii.invnum = invoice_num;
This is a relatively expensive join and search! Storing a TOTAL field in the INVOICE table introduces a potential inconsistency, but it is also rather common practice for performance reasons.The goal is for the system to have an awareness of where the data is located. Operations on a given collection of data are scheduled, where possible, on the same LAN as the data, avoiding backbone traffic. Hadoop is generally "rack-aware", really meaning it knows when two nodes are on the same high-speed switch.
Smaller-scale Hadoop setups have one master node (CPU) and multiple "worker" nodes that are both DataNodes (repositories of data) and also TaskTracker nodes. Data is replicated on multiple nodes; the default degree of replication is 3.
Hadoop is designed to support a lot more reading than writing. Examples might include Google's search index, Facebook pages or Amazon inventory. In June 2012, Facebook had a 100 petabyte Hadoop installation, growing at 0.5 PB/day.
The Hadoop Filesystem is just that. Most ⟨key,value⟩ stores will be implemented as Hadoop tables. Note, however, that any filesystem is a ⟨key,value⟩ store where keys are filenames and values are file contents.
MapReduce is a large-data paradigm developed at Google and based on a common functional-language operation. The original overview paper by Dean and Ghemawat is mapreduce-2004.pdf. Not every Hadoop installation runs MapReduce, but this is the "native" data-analysis tool. The MapReduce engine has one master JobTracker node, which receives MapReduce tasks from clients and organizes them onto TaskTracker nodes. TaskTracker nodes generally run a small number (typically 4) of MapReduce tasks.
The idea behind MapReduce is that the programmer must supply three components:
The map() operation -- which is where most of the parallelization occurs -- usually takes a ⟨key,value⟩ pair and returns a list of related ⟨key,value⟩ pairs. A more specific description of the interface is
map (in_key, in_value) -> list(out_key,
intermediate_value)
The out_keys in a single list need not be the same.
A reduce() operation typically takes a single out_key, and a list of all (or some of) the intermediate_values associated with that out_key. The result is a list of output values.
Here is one of Google's standard examples of MapReduce. This example counts the occurrence of each word in a set of documents.
map(String doc_name, String doc_body): // doc_name (in_key): document name // doc_body (in_value): actual document contents for each word w in doc_body: EmitIntermediate(w, 1); reduce(String word, Iterator intermediate_values): // word: a word // output_values: a list of counts int result = 0; for each v in intermediate_values: result += v; Emit(AsString(result));
The map() operations generate long lists of words with a count of 1. The reduce() phase is applied to all the map() results with the same key (that is, all map() results for the same word). It is up to the shuffler to send all instances of the same word to the same reducer.
Notice also that the ⟨key,value⟩ input to map() is ⟨filename,file_contents⟩, quite different from the ⟨key,value⟩ pairs generated as output.
Here's an example in which we assume that the map() operations actually count all the individual words of each document:
doc1: here is a list of words
doc2: list of words with words in list and another list
doc3: here here here is words and words and another words
Map() outputs:
(here, 1) (is, 1) (a, 1) (list, 1) (of, 1) (words, 1)
(list, 3) (of, 1) (words, 2) (with, 1) (in, 1) (and, 1) (another, 1)
(here, 3) (is, 1) (words, 3) (and, 2)
(another, 1)
Reduce inputs:
(here, 1) (here, 3)
(is, 1) (is, 1)
(a, 1)
(list, 1) (list, 3)
(of, 1) (of, 1)
(words, 1) (words, 2) (words,
3)
(with, 1)
(in, 1)
(and, 1) (and,
2)
(another, 1) (another, 1)
The reduce tasks then add up each word over each list, below. Note how the MapReduce infrastructure must take the results returned by the mappers and reorganize them by key (shuffles them) to feed into the reducers.
here 4
is 2
a 1
list 4
of 2
words 6
with 1
in 1
and 3
another 2
Search is another operation that lends itself to MapReduce formulation. The map()s just output records containing the value; the reduce() just consolidates the map output.
Reverse web-link graph: the map() searches source pages and outputs a ⟨target,source⟩ pair each time a link to target is found on page source. The reduce operation concatenates all ⟨target,source⟩ pairs with the same target into a pair ⟨target,list(source)⟩
A fair bit of machinery is needed to organize the map() outputs in a way that can be fed into the reduce() inputs; this is the shuffle stage. This shuffle, though, is usually a generic operation, and usually easily parallelizable. Typically this phase uses extensive hashing on intermediate_key values: each map() output pair ⟨ikey,ivalue⟩ is immediately put into bucket hash(ikey) by the system. Then, in the reduce() phase, ⟨ikey,ivalue⟩ pairs with the same intermediate_key in one bucket are fed to one reduce() task; of course, that bucket will contain all the ⟨ikey,ivalue⟩ pairs with ikey = intermediate_key. The system keeps track of the order of intermediate_key values in one bucket, for easier later sorting.
The RCFile storage strategy above was fundamentally an attempt to optimize storage for MapReduce operations. Each block (think 8 MB) gets one mapper thread.The name comes from humongous, itself a mashup of "huge", "monstrous" and possibly "stupendous". MongoDB is arguably the NoSQL DB with the largest installed base (though that makes it the fifth largest overall in 2014 after Oracle, MySQL, SQL Server and PostgreSQL; see here).
Every MongoDB record (or document) has a fixed 12-byte identifier, of type ObjectID. Beyond that, data can be rather free-form, and often is set up in a format that, in a relational system, would be considered highly denormalized. Here is an example from tutorialspoint.com/mongodb/mongodb_data_modeling.htm. A system is being built that includes user posts.
In Postgres, we might model this as a table post(id,title, description, url), a table tag(post_id, tag) and a table comments(post_id, comment_id, username, message). If we also recorded a list of likes for each comment, instead of just a count, we'd need a fourth table.
MongoDB would put this all in one table (or Collection, as Mongoistas call them):
{ _id: POST_ID -- the ObjectID title: TITLE_OF_POST, description: POST_DESCRIPTION, by: POST_BY, url: URL_OF_POST, tags: [TAG1, TAG2, TAG3], -- can be arbitrarily long likes: TOTAL_LIKES, -- a count comments: [ -- again, an arbitrarily long list (shown here with two entries) { user:'COMMENT_BY', message: TEXT, dateCreated: DATE_TIME, like: LIKES }, { user:'COMMENT_BY', message: TEXT, dateCreated: DATE_TIME, like: LIKES,
rank: RANKING -- we dynamically add a new field
} ] }
Note that we have an array of tags and an array of comments. The like fields here are representing counts, but if we wanted to record the IDs of the likers, we'd need a subarray.
Retrieval of records like this is very fast by the_id field. MongoDB also supports searching by other fields.
At least some of the appeal of MongoDB is that this sort of complete denormalization seems natural. However, the above MongoDB schema could also be implemented in Postgres (which does support arrays, though not as fully (they are harder to search, for example)).
But denormalization isn't always the benefit it appears to be. To retrieve one user's aggregated data, eg to display on that user's page, we'd need to retrieve the post from the post table, then the list of comments from the comment table, etc. With the right indexes, these should all be fast. If we're displaying just one user, we do not need a join, just three lookups.
The other problem with denormalization is that the MongoDB approach here does not perform well if we want to include the comments on the home page of each commenting user (perhaps with the original post). With MongoDB, we have to search each record of each comments list for the user field, and we probably do not have an index to speed this up. With Postgres, it would be very natural to create an index on comment.username.
MongoDB supports automatic sharding on user-defined shard keys. A single shard runs on a master host with possibly multiple slave hosts replicating the data. MongoDB can in some sense be pigeonholed as a document database with high-quality support for replication and sharding.
The MongoDB people claim their product works well for distributed systems with 100 - 1,000 nodes.Hive (http://hive.apache.org/) is a database application built on top of Hadoop. Facebook was a major contributor; here is a Facebook blog on Hive, a paper on Hive, and a slideshow. The fundamental goal of Hive is to provide a familiar SQL-like user interface (known as Hive QL, or HQL) for making queries that will be implemented with MapReduce. Hive is a "batch" system, not meant for interactive queries; even small queries may take several minutes.
Facebook uses Hive for its data warehouse.
Hive supports ⟨key,value⟩ tables with indexes. It also supports data types (eg large-file data types such as text files, subtables, and binary files. Tables may contain partition columns (sometimes "virtual" columns) that provide tags for partitioning the table among multiple nodes; in the data-definition language this is indicated by a PARTITION BY clause. Tables within one node may be organized into buckets, eg by date or userid; these in turn are indicated by a CLUSTER BY clause. Here is an example table declaration from https://cwiki.apache.org/confluence/display/Hive/Tutorial#Tutorial-WhatisHive:
CREATE TABLE page_view(viewTime INT, userid BIGINT, page_url STRING, referrer_url STRING, friends ARRAY<BIGINT>, properties MAP<STRING, STRING> ip STRING COMMENT 'IP Address of the User') COMMENT 'This is the page view table' PARTITIONED BY(dt STRING, country STRING) CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS ROW FORMAT DELIMITED FIELDS TERMINATED BY '1' COLLECTION ITEMS TERMINATED BY '2' MAP KEYS TERMINATED BY '3' STORED AS SEQUENCEFILE;
Unlike relational systems, a primitive array type is supported; note the friends field above.
Hive's primary feature is that it supports an SQL-like query language, HQL. It is not as general as SQL, but HQL queries are very unlikely to run excessively slowly on huge data. HQL queries are translated by Hive into MapReduce queries. (According to Facebook, their Analytics users had trouble writing MapReduce queries themselves, which is not surprising.) In many ways, this feature of Hive -- that it is a front-end for map-reduce, allowing queries to be written in a more familiar fashion -- is its most important.
HQL supports
For each of these operations, creation of the appropriate mappers is relatively straightforward. For GROUP BY, the mappers output the GROUP BY attributes as the output key (the "w" in the example above). Joins can be implemented either in the map side or the reduce side; in either case, the first table listed should be the larger table (often much larger). For reduce-side joins, the join attribute is the output key from the mappers; the reducers then connect each value with the corresponding records in the second table. The overall effect is similar to a Hash Join. For a map-side join, the second table has to fit in memory; each mapper carries out the join on its set of first-table records and the shufflers and reducers simply consolidate everything.
Query output always goes into an intermediate table, rather than to the console. You can do this in SQL, too.
Facebook reports a fair number of events where one of the Hive systems
was effectively unavailable due to an inefficient or otherwise overly slow
query.