I would like an implementation of item:item similarity capable of web-appropriate response times (< 100ms) under a web-scale load:
What is known about dealing with loads of this size:
This is an "if I had my druthers" document. With unlimited time, this is how I would implement such a system. To be fair, this is also quite a bit of how Matt would implement such a system since my ideas on distributed databases have been strongly influenced through discussions with him on the subject.
The basic underlying technology that doesn't exist is a scalable functional tuple manipulation language. I'm going to describe a program in that language as well as the optimizations I would expect the engine for that language to make. Since I don't have time to implement a language of this sort, the actual program I write will be a single instance.
This is the same as if I described the actions and optimizations of a C compiler, but didn't have time to write the compiler, so just hand compiled the code. It is still easier to read and conceptualize the C than the assembly.
Modern DBMS systems take a program agnostic view of data representation. The DBMS is told the types and relationships of the data, and it allows client programs to connect and query that data. The specific issues that a most DBMS systems do not handle well are:
Driven in part by the rise in popularity of Google's Map-Reduce algorithm, a new form of processing known as Data Intensive Scalable Computing (DISC) has been developing. In Automatic Optimization of Parallel Dataflow Programs, Yahoo! Research describes DISC versus DBMS:
In developing automatic optimization techniques for DISC, we can draw upon many ideas and techniques from the database community, which has studied set-oriented data processing for decades, including parallel processing on clusters. While DISC systems bear a strong resemblance to parallel query processors in database management systems (DBMS), the context is somewhat different: The DBMS context emphasizes highly declarative languages, normalized data and strong consistency, whereas DISC is geared toward procedural code, flexible data models, and cost-effective scalability through weak consistency and commodity hardware.
Traditional DBMS optimization techniques [18] are model-based, i.e., they search for “optimal” execution strategies over models of the data, operators and execution environment. In the DISC context, accurate models may not be available a priori, because:
- Data resides in plain files for ease of interoperability with other tools, and the user may not instruct the system how to parse the data until the last minute;
- Many of the operators are governed by custom user-supplied code whose cost and data reduction/blowup properties are not known a priori;
- DISC uses large pools of unreliable and perhaps heterogeneous machines, and formulating simple and accurate models of the execution environment is a challenge.
Starting around 2000, motivated in part by considerations related to the ones stated above, the database community has begun studying adaptive approaches to one optimization problem: query plan selection. Adaptive query planning does not rely on the a-priori existence of accurate models, and instead adopts a trial-and-error, feedback-driven approach.
With adaptive query processing and cluster awareness, DISC moves toward the processing model that I would like, but unlike DISC I am willing to do away with generalizability in order to optimize for speed. It is not unreasonable that a company running a recommendation engine will have a set of computers dedicated to that specific purpose. The system can be structured toward optimally delivering those results at the cost of producing slower results for other operations.
The basic idea for this program is it is "compiled" into a procedure for distributing and managing data across a cluster of multicore map-reducing tuple stores. The inputs are defined and the outputs are defined, but none of the intermediate steps is required to generate any particular structure in the datastore. The query optimizer has complete freedom and will generally be driving toward three goals:
sum or sizeof) and within nodes with map-reduce.public set items is { string key asin, string name }
public set users is { string key username, unique string emailAddress }
public append only set views is { reference to items as item,
reference to users as user,
timestamp viewTime }
// aggregate the views into view counts
set counts is { reference to items as item,
reference to users as user,
sizeof(views[.item = item and .user = user]) as count }
// cull outliers (not really loving this syntax)
number mean is mean(counts.count)
number deviation is deviation(counts.count)
filter counts on .count > mean - 3 * deviation and .count < mean + 3 * deviation
// compute the lengths of the vectors
set lengths is { reference to items as item,
sqrt(sum(counts[.item = item].count ^ 2)) as length }
// how to represent a join? what if user was defined outside this definition?
// when do I need other types of joins and natural?
set dotProducts is { reference to items as itemOne,
reference to items as itemTwo,
sum(counts[.item = itemOne and .user = user].count *
counts[.item = itemTwo and .user = user].count) as product }
// the cosines are the similarity and are public to other programs
public set similarity is { reference to items as itemOne,
reference to items as itemTwo,
dotProducts[.itemOne = itemOne and .itemTwo = itemTwo].product /
(lengths[.item = itemOne].length *
lengths[.item = itemTwo].length) as similarity }
// because I know that access will generally be ordered by similarity, I
// tell the system that so that it can optimize for that
order similarity by .similarity
When this data is actually accessed then, say to get the ten most similar items, that would look like:
use set 'tdf://localhost/amazon/similarity' as similarity set items is similarity[.itemOne.asin = 'WKJHE-2324Q'] order items by .similarity set topTen is items[(1:10)].itemTwo
The first line represents a set definition:
public set items is { string key asin, string name }
So the system needs some way to represent these strings and distribute them. The method used by Aura is to have a binary trie with the branches distributed across multiple agents. This method is:
The downside is the mappings of strings onto nodes is immutable. There's no capacity to shuffle them around. When doing a map-reduce the shuffle step gets all the maps with a common reduce key to the same node because the reduction has to be done within a single process. If I know ahead of time what map-reduce operations I will be doing and I have flexible storage, then I can make sure the elements start off grouped together on a node so there is less time in the shuffle.
This ability requires, however, that data storage decisions are more flexible that what a distributed binary trie offers.
Matt has offered an interesting solution that makes a lot of sense: Bloom filters. Say, for example, that I would like to create the storage for the asin string. It is labeled as a key which means it is unique across the store i.e. attempting to insert it requires verifying it isn't in the cluster yet. Imagine that the cluster allows any value to be stored on any node. Each node can be maintaining a Bloom filter for that storage requirement and instead of doing n searches, I check n Bloom filters (which is significantly faster) and only do ε searches (for the false positives on the Bloom filters).
The probability of a false positive on an m-bit Bloom filter covering n items with k hash functions is:
Recall that because the storage location is flexible, the size of the filter and the number of stored elements can be tweaked for individual nodes. This means that the load placed on heterogeneous machines can be adapted to the resources available for that machine and the load on the cluster overall.
At this point, all that is known is that I have a bunch of strings that I am going to search through. I know nothing about how they will be accessed, so there's no reason to distribute them in any manner other than evenly across the store. Maybe the nodes have reported their size and speed when joining the store, and distribution is made proportional to those characteristics to reduce network latency. At this point though, we will assume equal sized and powered nodes.
Note that there are two strings being stored: asin and name. We know nothing about the data access characteristics of these tuples so we will likely just distribute both elements of the pair to the same store. There are situations though where an operation that manipulates one column is joined on a different characteristic than joins on another column, so the groupings for a map reduce will be different. In this situation, the elements of a tuple could end up on different nodes.
public append only set views…
A note on the idea of an append only set. You're going to want to cache information to speed up access. Caches require space and processing power to access. If I know that a set item will never be updated or deleted, that means that its element in the cache can never be invalidated unless the item actually changes which node it is stored on. This means a completely different node can maintain the cache with no communication to the node where the element is actually stored.
It simply allows another level of distribution and parallelization of processing in the system.
set counts is { reference to items as item,
reference to users as user,
sizeof(views[.item = item and .user = user]) as count }
Another feature of this language is the semantics of joins is handled internally. I tell the system that I want the tuples to be joined and it handles creating whatever unique identifiers and indices are necessary to do that quickly.
One method for doing this is to add a uuid element to the tuple. UUIDs are then indexed for quick access. This uuid is then stored by the same method as the the strings using Bloom filters.
In the last line, there is finally a computation:
sizeof(views[.item = item and .user = user])
Certain types of operations have special qualities that affect optimizations. sizeof happens to have both:
number mean is mean(counts.count)
Mean also doesn't provide any information on how data should be distributed because it is also composable and O(c) updatable. If I know k means and the number of elements used to generate those means, I can find the total mean. So far as implementation, I would likely actually generate and maintain a sum tuple as mentioned above rather than actually update a mean by generating and multiplying lots of floating point numbers to avoid the accumulation of floating point errors.
Those details though would be handled in the definition of mean and I'm not going to the depth of describing how these functions are defined yet.
number deviation is deviation(counts.count)
A single operation (square root) on a composable and O(c) updatable number, so still no dictates on how data is distributed.
filter counts on .count > mean - 3 * deviation and .count < mean + 3 * deviation
This is an interesting statement. Imagine that the mean and deviation were immutable and I knew them when inserting a new element. I would want to distribute tuples within this range evenly across all the nodes so that coming computations would be spread evenly across the nodes. I don't, however, know those figures, so I have three options:
The method I like the most is the second. Imagine a cluster that while it is not working on processing is working on reconfiguring itself so later processing will be faster. This isn't an abstract adaptive intelligence sort of idea, but rather something concrete where because information is known about the future operations that will be done (from the data definition program), the system can attempt to redistribute data so future queries are faster.
If there are operations like reindexing that require significant time, you could actually have say two nodes replicating the same data and they could agree between themselves to change the load balancing so one has time to run the computation. There are all sorts of interesting ideas when you have confederating stores attempting to learn the optimal processing methods.
This could either be done through confederations of nodes or managed through a central server. I don't really have the clustering background to suggest an algorithm at this point.
set dotProducts is { reference to items as itemOne,
reference to items as itemTwo,
sum(counts[.item = itemOne and .user = user].count *
counts[.item = itemTwo and .user = user].count) as product }
The optimization on the previous statement was to attempt to balance the items equally across all servers. The evaluation of this query requires that the counts identified by the join are on the same server. (I have to know a and b to compute ab.) As before, this optimization could be done on addition, but I like the idea of an adapting confederation. Say, for example that when a store requests an element for a join if it knows that it has space to store the other tuples in the join then it absorbs those tuples. This isn't caching, but the next time the query is evaluated, the network latency will have been eliminated.
order similarity by .similarity
Knowing that the elements are going to be ordered then an order tuple can be maintained. Adding a new entry into a ten million item ordered list is significantly less time intensive than ordering a ten million item list.
Actually though the bulk of the list doesn't need to be ordered. Out of a million items, the interesting ones are going to constantly be the top fifty or so. When a new item is added I can figure out if it is in the top k, and if it is then insert it in the appropriate place. If it isn't, I can just not worry about it. If a query comes through for a block outside the top fifty then I just have to first find the space between fifty and what is requested and then sort it. After that I maintain the new length.
If I were actually implementing this, the high level operations would be compiled down to a set of atomic relational algebra operations which had known mathematical ordering, decomposition and parallelization attributes. Optimizations would be done within that tree. That tree would then serve as the basis for the adaptive balancing on the part of the nodes.
Since I don't have time to do the actual program though, I need to figure out how to cull this to a single instance implementation.