Monthly Archives: June 2009

version 0.8 beta

We are happy to announce the release of version 0.8 beta on Sourceforge. This release packages both the scale-out (distributed) and scale-up (single-server) versions of the bigdata RDF store. This release is capable of loading 1B triples in well under one hour on a 15 node cluster and has been used to load up to 13B triples on the same cluster. It also captures substantial improvements in the scale-out architecture. While query performance has not been optimized recently, it is nevertheless quite reasonable.



Enterprise ready scale-out?

People have been inquiring whether the bigdata scale-out architecture is enterprise ready, and, if not, what it would take to get there. Bigdata, as a scale-up platform, has been in operational use for years, but this is still an early adopter phase for the scale-out architecture. What follows is an overview of some enterprise features and their current state of readiness.

If you need some feature that is not finished yet, consider getting involved or helping out by funding feature development. We can tackle any of these issues within a few months.

Datalog support for query time inference. Bigdata has an efficient internal rule execution model. SPARQL queries are translated into the internal rule model and then executed using distributed joins. Some entailments are computed at query time, but generalized query time inference requires a rewrite of the query and the RDF(S)+ entailment rules into a minimum effort program which computes exactly those entailments required to answer the query. This is done using a magic sets integration.

Status: Early development.

Transactional semantics. In order to have a semantic web database which is used as a transactional system at scale you must either use NO inference (just triples) or use query time inference rather than eager materialization of the triples. The issue is that eager materialization of inferences requires the total serialization of all transaction commits, which would be an unacceptable performance bottleneck regardless of the rest of the architecture. Either way, the semantics become those of the underlying database concurrency control algorithm, which in this case is MVCC. Further, in order to avoid race conditions for the lexicon (the mapping from RDF Values, URIs, Literals, and blank nodes, onto internal 64-bit unique identifiers) we use an ACID, but non-transactional, consistent write strategy. This guarantees a consistent mapping of RDF Values onto internal identifiers without limiting concurrency.

Status: Read-only transactions are done and are used to support high-level query. Full distributed read-write transaction support is mostly finished, but we are still working on the distributed commit protocol. MVCC is fully supported in the data services and the indices.

Concurrency Control. Bigdata uses Multi-Version Concurrency Control. We timestamp all tuples in the indices. Transactional commits identify write-write conflicts based on those timestamps. If the timestamp has been changed since the ground state from which the transaction is reading, then there is a write-write conflict. For RDF, we can reconcile write-write conflicts. The typical situation is that two transactions both write the same triple on the database. This is normally not viewed as a conflict since the RDF data is not typically used to establish ad-hoc locking protocols! Further, if we are using either query-time or NO inference, then there is no value associated with the tuples in the statement indices. All of the information is captured by the key. Write-write conflicts do arise, but only when some statements are being retracted.

Status: Done.

Locking. MVCC does not utilize locks for concurrency control. Instead, it does validation during the commit protocol as described above. We do support synchronous distributed locks through an integration with zookeeper, but that is not part of the concurrency control architecture.

Status: Done.

HA architecture. There are two alternatives here. The data service (DS) is the container for the index partitions (key-range shards). There are logical data services and physical data services. Clients always write on the master DS for a given logical DS.

Alternative 1. Clients can read from any physical DS for a given logical DS. Storage can be on either local disk or SAN/NAS. Local disk is acceptable for this alternative because the data are replicated across multiple machines, which provides built in media redundancy. The master (DS) pipelines writes to a failover chain of k secondary DS. That pipeline is flushed during the commit protocol by the master DS. The commit succeeds once the writes are on stable storage on the master and the secondaries or fails and is rolled back. If the master fails, then the 1st secondary is elected as the new master. We handle master election using zookeeper. Zookeeper was developed by Yahoo! as a distributed lock and configuration management service and is now an Apache subproject (part of Hadoop). Among other things, it gets master election protocols right.

Alternative 2. Storage is a shared volume (SAN/NAS). The secondaries DS are registered but inactive until the master fails, at which point the 1st secondary in the failover chain re-opens the same persistence store from the service directory on the SAN/NAS.

Status: Failover has not been implemented. Alternative 2 is the easiest to realize and many organizations perfer to manage storage separately from servers. Alternative 1 probably has the best price/performance for deployments since it can use local disk.

Backup and recovery. Bigdata uses a log structured store known as a “journal” to buffer writes. Periodically, the journal will reach its nominal capacity of ~200 MB. At that point, there is an atomic cutover (“synchrono
us overflow”) to a new journal and an asynchronous overflow process migrates the buffered writes onto read-optimized B+Tree files (“index segments”). Backup therefore entails copying the index segments, the current (live) journal, and the previous journal (to capture the buffered writes). The best time to do a backup is during the atomic cutover to the new journal. At that point, write activity on the journal is suspended and it may be snap-copied. In fact, the copy operation only needs to be protected for the root blocks, which are the first page of the journal on the disk. A backup protocol could be integrated into synchronous overflow processing with very low overhead. Backup must also capture new index segments are they are generated, so there is a second integration point for that (the 1st HA strategy already requires the synchronous propagation of index segments to the failover data services).

Offline recovery is a matter of restoring the persistent state of the services and re-starting the services. Service (re-)start is quite fast, but a total database recovery would not be a fast operation.

Lightweight recovery of data with has been overwritten by transactions that you need to rollback may be achieved using the history retention policy. When you configure a bigdata federation, you can specify the minimum retention age for historical commit points. This can be hours, days, or weeks. Bigdata will not release those commit points until their retention age has expired. This makes it possible to perform correcting actions which bring the database back into a desired state. It would be quite feasible to develop a feature where the database was rolled back to a historical commit point and transactions could then be selectively reapplied from a log.

Status: Not implemented yet.


Incremental data load and query performance

We have been getting some questions about incremental data load and query performance. Let me tackle both issues here.

With RDFS+ databases, there is always a trade off in when you materialize the statements entailed by the combination of the data and the ontology. This can be done eagerly, when the data are loaded into the database, or on demand, when a query is issued. Eager materialization (also known as eager closure) has advantages for some situations, but implies more latency between when you data are stable on the database and when the database can answer queries based on the new data and also takes up more space on the disk. The alternative is query-time inference, but the downside is that you must do more work to answer the query. In practice, bigdata, like most RDF databases, does a little bit of both — materializing some statements during data load and others dynamically during query.

One of the next features that we will introduce is a magic sets integration. This will allow us to compute more of the inferences at query time using a minimum cost “program”. The magic sets rewrite of the query is basically the original query, plus the entailment rules which were not materialized, plus a bunch of “gates” which prevent those rules from firing unless they are specifically required to answer the query, and even then they are fired with bindings which constrain their scope to exactly the necessary data. This integration will let us offer more flexible inference mechanisms and will give us another alternative for handling truth maintenance (when statements are retracted from the database).

Bigdata is an Multi-Version Concurrency Control (MVCC) architecture. This means that readers never block. You can use a read-only transaction to place a read-lock on a pre-materialized view of the data plus any entailment rules which are eagerly materialized. Clients can then read from that view while you add (or remove) statements to (from) the database. Once the closure of the database has been updated, you can simply update the view from which the clients are reading and they will immediately begin to read against the new state. These updates can be small and fast, or they can be massive. Bigdata simply does not care. However, if you are using eager closure then small incremental data loads will be quite fast while larger loads will take more time to update the materialized statements, but are generally more efficient. In practice, if you have a lot of small updates, it may be better to multiplex them together for more throughput — but this depends on your application. However, clients are still reading from your last consistent view so the update is atomic from their perspective.

“Historical states” are views which have been superseded by subsequent commits. Historical state is retained automatically for open transactions. In fact, bigdata can be configured as an immortal database, where all history is preserved, if you have the disk and the requirement for that capability. More commonly, you will configure the minimum age that historical state must be retained, say 1 day, and older data is gradually purged to reclaim disk space. This all happens transparently during what we call “overflow” processing — you never have to “vacuum” the database. When we cite high throughput like 300k triples per second or 1B triples in an hour, that is with ongoing overflow processing and dynamic index partitions partitions.

Query performance is quite good — and we are looking forward to giving everyone some hard numbers real soon now. Query performance on a single machine is comparable to the best commercial triples stores. Query performance on a cluster varies between 2x faster and roughly equal to the best commercial triple stores running on a single machine. So, we are not loosing any performance by running a distributed database, not even for small queries. We are going to wrap up soon with the work we have been doing on data load throughput and then turn back to query performance optimization. While query performance on a cluster right now is good, we made some changes in how data is distributed across a cluster in order to achieve higher write rates and now we need re-optimize distributed query performance. In fact, I expect query performance will improve substantially when we do this, which is why we are not quoting numbers at this time.

Bigdata has a lot of advantages for query processing. For example, readers are non-blocking (MVCC) and can run concurrently, bloom filters are available at each index partition (aka key-range shard), so they can be applied to very large data sets efficiently, and we incrementally optimize the data sets by migrating buffered writes from a log-structured store onto read-optimized B+Tree segments. Overall, the scale-out architecture allows us to apply vastly more resources to query processing when compared with any single host solution.

We are working on a whitepaper in which we will publish on the scale-out architecture, data load throughput, and query performance on a cluster. We are using synthetic data sets for this, but if you have a lot of data and queries, contact us — we’d be happy to run the numbers on your data!


Scale-up or scale-out?

Scale-up or scale-out? You can scale-up to larger and larger problems by buying a machine with more RAM, or by buying an even fancier machine with a dozens of CPUs and a huge amount of RAM. However, you swifly reach the limits of what is practical (the former) or cost-effective (the latter). Commodity hardware is cheap and scale-out approaches let you make the most of it.

Bigdata loads 300,000 triples per second on a commodity cluster. 1,000,000,000 triples loaded in under an hour. You can not touch performance like that on a single machine for anything near the same cost. And it is not just the CPU and the increased RAM, but the parallel DISK IO that comes along with the architecture. And when one machine fails, you just failover. If you have sunk all those resources into a single (very) high end server and it goes down, well, you are just out of luck. Bigdata is a fully persistent architecture. Service restart time is well under a second and your data is available immediately, not re-loaded from a log file.

Bigdata is architected to be highly concurrent. Most databases limit you to one writer, or to one writer on an index. In bigdata, we transparently and dynamically break down scale-out indices into key-ranges called index partitions (shards, really) and each writes on each index partition run concurrently. This means that the potential concurrency of the application grows with the data scale. And since we dynamically partition the data, you can always add more hardware to keep pace with your data size or your query demands without having to reload all your data.

We are still working on query performance tuning, but we have already seen query response times which are better than the best scale-up triple stores. Bigdata distributes the query processing across the hardware, doing JOINs right at the data and it uses MVCC (Multi-Version Concurrency Control), so we run concurrent read-consistent queries without blocking. It is able to put more CPU, more RAM and more DISK bandwidth on any given problem when compared to any single machine.

Oh, it runs on a single machine too.