RFC: FollowTheMoney Data Lake Specification

Hey all :waving_hand:

Over the last years, @pudo and me were having a lot of discussions about a shared dream: Having a standardized and complete specification for FollowTheMoney-based datasets, including document storage, that would serve as an efficient base layer for a wider range of existing and upcoming investigative applications than Aleph currently does.

We just came up with a summary of what as been worked on this matter so far as well as our proposed specification. Feel free to comment on this and share your thoughts!

FollowTheMoney Data Lake Spec

This specification defines a data lake structure for use with OpenAleph, OpenSanctions and related systems. The idea to to produce a long-term storage mechanism for investigative graph data, both in source, intermediate and processed form.

Core concepts:

  • Datasets are logical units of source data, often representing a data origin, such as an official register or publication, or a leak of documents.
  • Data catalogs are index files that help make indiviudal datasets more easily discoverable.
  • Entity files are data files in which individual FtM entities are stored in a ready-to-index form, ie. they’ve been aggregated from fragments or statements. An indexer may need to add authorization information and apply denormalisations as needed.
  • Archive objects are files that represent source or intermediate document formats used in document forensics. They’re referenced from FtM entities via their SHA1 content checksum.

Function

The idea of a FtM data lake is to provide a platform for multi-stage processing of data into FtM format. Keeping this data at rest (rather than, for example, in an Aleph operational database, and in ftm-store) promises modularity, simpler architecture, and cost effectiveness.

The fundamental idea is to have a convention-based file system layout with well-known paths for metadata, and for information interchange between different processing stages.

Basic layout

A data lake file system may need to be able to hold metadata headers (e.g. Content-Disposition, Content-Type), so its better to think of this as object storage (S3, GCS, MinIO) than a plain operating system FS.

datasets/
    catalog.json
    [name]/
        index.json
        versions/
            index-[ts]-[uuid].json
            
        .LOCK
        # If present, a crucial write operation is currently 
        # happening by one of the deamons. Contains timestamp of
        # start, anticipated end, and tolerance.
        
        archive/
            # sha1 is full file name, not `data` like in `servicelayer`:
            00/de/ad/beef123456789012345678901234567890

            # optional: file metadata (e.g. scraped headers) 
            00/de/ad/beef123456789012345678901234567890.json
            
            # text-only representation (e.g. from an ocr process); convenient
            # for non-Aleph analytics on file system
            00/de/ad/beef123456789012345678901234567890.txt  
    
        mappings/
            [uuid-1]/
                mapping.yml
                sheet1.csv
                sheet2.csv

        entities/
            # idea for storing UI-generated entities:
            crud/
                [entity_id]/
                    # sortable IDs:
                    [uuid-1].json
                    
                    # missing `current.json` implies entity is deleted
                    current.json
            # append-only file fragments from ingest-file, bulk upload, etc:
            statements/
                [origin-1]/
                    [uuid-1].csv
                    [uuid-2].csv
                [origin-2]/
                    [uuid-3].csv
                # Instead of `origin`, are these `phase`, `stage`?

            # generated by an aggregator batch job on `statements/`:
            entities.ftm.json
            
            # generated by a vectorizing service
            entities.vectors.json  # {"id": <entity_id>, "data": <...>}

            # for delta generation:
            entities.hash
            
            # Alternative:
            aggregates/
                [run_id]/
                    .BEGIN  # timestamp as content
                    entities.ftm.json
                    .DONE  # timestamp as content
        
        xref/
            # Theoretical, but an inverted index for xref entity
            # blocking:
            xref.idx

        # generated by an aggregator batch job on entities and archives:
        # some of them are optional and subject to specific use cases/applications
        exports/
            statistics.json   # entity counts, pre-computed facets
            graph.cypher  # neo 4j
            statements.csv  # complete sorted statements
            documents.csv  # document metadata
            inverted.idx  # what entity IDs point to entity X
            
            # for UI rendering in apps (e.g. OpenAleph)
            files.json  # nested file graph (folder -> subfolder -> file)
            emails.json   # nested/resolved email entity graph
            
            archive.zstd
            # diff exports
            replay/  # ??
                statements.[from-date]-[to-date].csv.diff
                entities.[from-date]-[to-date].ftm.json.diff

Some thoughts on this:

  • The entity data is not versioned here. In OpenSanctions, we’re actually using a subfolder called artifacts/[run_id] to identify different ETL runs. This may not apply as well to Aleph, since it has no strong segregation of individual ETL runs.
  • This still doesn’t have a nice way to do garbage collection on the archive without refcounting on entities.
  • We may want the entity object structure in the lake to be a new format, e.g. with a dataset field and statements lists on each entity (instead of properties).

Meet the daemons

Entity aggregator

A service that would traverse all individual statement files in the entities/statements folder, sort them into a combined order and then emit aggregated FtM entities.

Ideas: DuckDB doing a big fat UNION on the CSV files right from the bucket, or some monstrous Java MapReduce/Spark thing that is good at sorting a terabyte without breaking a sweat. (Output does not have to be FtM entities - a combined & sorted statements.csv has the same effect of making the data indexable).

See also:

  • ftm4 aggregate-statements command reading sorted statements to emit entities.

Entity analyzer

A service that would read entities.ftm.json and does analysis on (a filtered, subset of) the entities, e.g. NER, language detection, translations, vectorization. New statements are chunked and written back to the lake.

These micro services are already built with this lake concept in mind:

Entity indexer

logstash -j128 -i s3://lake/[dataset]/entities.ftm.json 

File ingestor

Reads uploaded documents from entities/crud (?) and then drops statement files into the statement folder every 60 MB (or after each document?).

If the backend supports notifications (eg. via SQS, PubSub), then the act of dropping a file to one origin/phase folder could trigger the subsequent layer of processing.

Catalog collector

Goes through each dataset folder, and brings a reduced version of the dataset metadata into a big overview catalog.json. This then pretty directly travels into the collections Aleph database.

Concept for user edits (3rd party apps)

An app, e.g. for Network Diagrams, would fetch the complete entities.ftm.json, load it in a temporary store (e.g. DuckDB) and do read/write operations on it. After an edit session, the resulting store is exported back to the lake.

Implementation stages

  • FtM 4.0 with dataset and catalog metadata specs
  • Make sure lake FS change notifications can be used for stage coupling
  • Build an ftm-store dumper
  • Find migration path for servicelayer archive

Long-term implications

  • This creates a flat-file alternative to ftm-store, using an external sorting mechanism to aggregate entity fragments.
  • The entities/crud/ section implicitly unifies the document and entity tables currently used in Aleph.
    • Introduces versioning - nice in WYSIWYG scenarios.
  • No need to have collection table, index and catalog.json.
  • While we’re at it, mappings become flat files (as is right and proper) and can be run by a daemon.
  • Lake folders can be copied between Aleph instances, making repeat processing (eg. leaks) unnecessary.
  • Re-indexing gains a huge performance boost (no sorting ftm-store tables, efficient bulk indexing).
  • Option for file-based inverted index building that can allow hi-performance cross-referencing (xref)
  • Improved infrastructure cost-efficiency as ftm-store postgresql (and therefore SSD storage) is obsolete

References

The outlined concept above is the result of a decade of open source tooling around this problem. A lot of experimental and production work has already been done within the FollowTheMoney/Aleph open source ecosystem:

  • servicelayer as the core document storage layer for Aleph/OpenAleph
  • nomenklatura by OpenSanctions for the statement based entities model
  • anystore by DARC as a generic key/value storage that can act both as a blob (file) storage backend or caching backend
  • leakrfc by DARC that was a first experiment for a standardized way of storing distributed file archives that Aleph, OpenAleph, memorious and other clients can read and write to.
2 Likes

Just realized we should probably also have a logs/ folder where we keep processing logs from individual daemons, perhaps in a standardized logging format so they can be slurped into a separate ES index if needed (as an audit trail).

1 Like

:100:
if it’s standardized, it could be consumed by any log aggregator service by your choice then :slight_smile:

One more: staging/ for the unprocessed source version of leaks? Or would it be staging/raw and staging/cleaned in practice?

1 Like

yes, i had racked my brains over this last year when i was experimenting around with leakrfc. some of the open questions:

  • If using staging/raw, staging/cleaned and archive/, a leak would roughly take up 3 times the space.
    • This could be obviously solved by only storing “altered” documents in staging/cleaned
    • where does e.g. a converted pdf from an office document go? Is this a staging/cleaned location or just into archive/?
    • Same for compressed archives: The sources in staging/raw but where to extract to?
  • Maybe source leaks should just somewhere else and within the DataLake there would be something like a Makefile for a complete leak that lists the cleaning procedures that led to what is found now in archive/

This list goes on.

Keen to hear any thoughts on this :slight_smile: