MongoDB is a popular document store for backing Web applications, but a challenge for the Data Engineer to ingest its content into a Data Warehouse.
The weekly mood
We worked in team around the functional specification of our API Gateway as well as our Data Lake project, which I enjoyed very much.
As part of the Data Lake, we currently need to source and analyse information from a specific Web application that uses MongoDB as operational database. Aggregations should be ran inside our corporate Cloud Data Warehouse (DW) based on Snowflake, BI-reports in Tableau.
While this post sets the focus on MongoDB, I do plan a series of articles around Data Engineering and Architecture, including our finding based end-to-end transformation pipeline from source to destination.
What is MongoDB
MongoDB is an open-source NoSQL database that stores its data in JSON-documents. The project started in 2007 at a company called "10gen" which was renamed MongoDB Inc. in 2013 and went public in 2017 (NASDAQ: MDB).
Since then, stock price and revenue are on a roll, so that the business could become profitable soon. Reasons for that success are major improvements to the engine, as well as its restricted open-source license terms allowing free use of MongoDB behind third-party commercial applications but not behind a managed database service. This bet came in favor to official MongoDB Atlas offering which is now available for AWS, Azure, GCP, IBM, Alibaba, RedHat OpenShift.
The database originally gained popularity on the developer community front for its broad ecosystem of drivers and tools. Thanks to its replicated commit-log aka. replica-set, MongoDB also achieves both High-Availability (HA) via redundancy, and single IO performance (i.e. read, write, and indexing operations) via horizontal scaling. This combination fits pretty well to the requirements of highly-concurrent applications. Some prominent Web platforms actually chose MongoDB as their main datastore: Codecademy, Coinbase, Ebay, Expedia, Facebook, Foursquare and Uber.
However, note that the engine might degrade in performance at sharding (ex. for high-throughput telemetry) and aggregating (ex. for large volume analysis). See also When to Use (and not to Use) MongoDB.
OLTP vs OLAP
MongoDB offers a suitable feature for Data processing that is called Aggregation Pipeline. However, since we are using the database for running an application in production, we obviously can't store too much historical data and fire aggregations in place, which would otherwise generate extra load on the system, put the application at risk and as a consequence, the operational Business.
For that reason, it is common practice to decouple operational from analytical concerns:
1. Use a production database for OnLine Transactional Processing (OLTP) only.
2. Replicate its data for example over night as a full snapshot, or in near/real-time as an asynchronous stream of delta changes.
3. Store the data into a system that is dedicated and optimized for OnLine Analytical Processing (OLAP), for example a Data Warehouse (DW).
Example application
The following hands-on is based on the open-source project Rocket.chat, another application based on MongoDB. Rocket.chat is an omni-channel messenger which one claims to be replacing both traditional instant (chat) and asynchronous (email) communications with team members and customers.
We'll be using Rocket.chat Helm chart with MongoDB v3.6.19 as per the version of the database of our internal application which we want to analyse.
$ kubectl create namespace rocketchat $ kubens rocketchat $ helm install \ --set mongodb.mongodbUsername=rocketchat, \ --set mongodb.mongodbPassword=changeme, \ --set mongodb.mongodbDatabase=rocketchat, \ --set mongodb.mongodbRootPassword=root-changeme, \ --set mongodb.image.tag=3.6.19-debian-9-r20 \ --name my-rocketchat stable/rocketchat
The release deploys the Rocket.Chat web application along a single-node MongoDB database server (without replica set). Once the pods are running, you only need to access the application URL, create a user and then you are ready to go with chatting... well soliloquy at the begining :-)
$ sensible-browser http://$(kubectl describe svc \ my-rocketchat-rocketchat | grep Endpoints: | awk '{print $2}')
MongoDB client shell
Looking for direct access to the database, let us check the client available from the server machine.
$ kubectl exec my-rocketchat-mongodb-primary-0 -- mongo \ -u root -p root-changeme \ --eval="rs.slaveOk(); db.test.find().forEach(printjson)" MongoDB shell version v3.6.19 connecting to: mongodb://127.0.0.1:27017/?gssapiServiceName=mongodb Implicit session: session { "id" : UUID("34099f03-caf6-4efa-99c7-96d5b0368989") } MongoDB server version: 3.6.19
You may install the same client on any distant machine from the network, in this case my local.
$ echo "deb http://repo.mongodb.org/apt/ubuntu xenial/mongodb-org/3.6 multiverse" \ | sudo tee /etc/apt/sources.list.d/mongodb-org-3.6.list $ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 \ --recv 2930ADAE8CAF5059EE73BB4B58712A2291FA4AD5 $ sudo apt-get update $ apt-cache search "mongo" mongodb-org-shell - MongoDB shell client mongo-tools - collection of tools for administering MongoDB servers ... $ apt-cache showpkg mongodb-org-shell | grep " - \$" 3.6.19 - ... $ sudo apt-get install -y mongodb-org-shell=3.6.19
Before going further, it can be a good idea to map you MongoDB private IP with a user-friendly hostname.
$ echo "$(kubectl describe svc my-rocketchat-mongodb | grep Endpoints \ | awk '{print $2}' | cut -d':' -f1) mongo.local" \ | sudo tee -a /etc/hosts 10.1.94.217 mongo.local
We'll now dig just a little bit into our sample database.
$ mongo -u rocketchat -p changeme mongo.local:27017/rocketchat \
--eval="db.getCollectionNames()" | grep message
"rocketchat_livechat_external_message",
"rocketchat_message",
"rocketchat_message_read_receipt",
$ mongo -u rocketchat -p changeme mongo.local:27017/rocketchat \
--eval "db.rocketchat_message.findOne()" \
--quiet
{
"_id" : "MshE4AKDejGaiwJkv",
"t" : "uj",
"rid" : "GENERAL",
"ts" : ISODate("2020-08-30T15:21:37.974Z"),
"msg" : "tncad",
"u" : {
"_id" : "3jh6AN3eLZKRCD6E9",
"username" : "tncad"
},
"groupable" : false,
"_updatedAt" : ISODate("2020-08-30T15:21:37.974Z")
}
Database Dump
Of course you can export your query results by redirecting the system output of mongo command to a file, or by using mongoexport command. This approach produces one line by JSON document. Here is an example of operational statistics (metrics on the number and size of objects) for the "rocketchat_message" collection:
$ mongo -u rocketchat -p changeme mongo.local:27017/rocketchat \ --eval "db.rocketchat_message.stats()" \ --quiet \ > /tmp/$(date +%Y%m%d_%H%m%S)_rocketchat_message_stats.json # a file
Instead, it is recommended to use mongodump (for data) and mongostat (for metrics) for creating snapshots.
$ mongodump --uri "mongodb://rocketchat:changeme@mongo.local:27017/rocketchat" \
-o /tmp/$(date +%Y%m%d_%H%m%S)_rocketchat_dump # a folder
This produces 2 files per each collection:
See also MongoDB shell quick reference for further client shell commands.
Our goal being analytics, a more efficient alternative is to take advantage of MongoDB Change Streams. However, we only have limited access to a Disaster recovery (DR) instance in our real project, and discussions about Replica set log activation on our system version 3.6 didn't lead to an agreement so far.
MongoDB standard UI = Compass
The client shell is fine for a minimal test or in case you can't start a UI. But it can be tedious to learn commands by heart, repeatedly have to type or guess simple thinks that a UI gives you for free.
See also MongoDB Compass project.
$ curl -sL https://deb.nodesource.com/setup_lts.x | sudo -E bash - $ sudo apt install -y nodejs $ npm install $ npm start compass
For some reason I couldn't easily solve some nodejs/npm version incompatibility on my environment, so I just gave up with Compass.
MongoDB unofficial UI = Robo3t
I already knew that the alternative client Robo3T (formerly "Robomongo") works well on Ubuntu.
$ wget https://github.com/Studio3T/robomongo/releases/download/v1.3.1/robo3t-1.3.1-linux-x86_64-7419c406.tar.gz $ tar -xf robo3t-*.tar.gz $ rm robo3t-*.tar.gz $ ./robo3t-1.3.1-linux-x86_64-7419c406/bin/robo3t
The client probably does not offers as many features as Compass, but at least it works well for me.
As we can see, such a client UI offers an immediate, therefore better overview of the number of collections (on the left) and potential complexity of document schemata (on the right). As an example, the picture shows the rocket_message collection along with a message document.
Schema inference
We need to export some de-normalized, schemaless JSON-Documents, and import them into a SQL-compliant, relational target. To do so, we need to infer the schema of each NoSQL collection (schemaless in theory, but in the case of Rocketchat well defined and documented as part of a reference documentation). Different approaches are possible.
1. Schema-aware collection from source DB: MongoDB allows for specifying a document schema for the purpose of document validation within a collection. In case such schema exists, it would show up when invoking the getCollectionInfos function. Here is an example based on the "rocket_message" collection.
$ mongo -u rocketchat -p changeme mongo.local:27017/rocketchat \ --eval "db.getCollectionInfos({name:'rocketchat_message'})" \ --quiet [ { "name" : "rocketchat_message", "type" : "collection", "options" : { }, "info" : { "readOnly" : false, "uuid" : UUID("310cba0f-5fe3-412d-9951-056cec5ad397") }, "idIndex" : { "v" : 2, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "rocketchat.rocketchat_message" } } ]
The collection details reveal the primary key information, but unfortunately no "native" definition of the collection schema. Indeed, since schema validation is not a mandatory step for storing documents, most application developers (including those from Rocket.chat and ours) will ignore it.
Alternatively, you may want to try the following:
Alternatively, you may want to try the following:
$ mongo -u rocketchat -p changeme mongo.local:27017/rocketchat \
--eval "var allKeys = {}; db.rocketchat_message.find().forEach(function(doc){Object.keys(doc).forEach(function(key){allKeys[key]=1})}); allKeys;" | sed 's/ : 1,/,/g;s/\s//g' | tr -d '\n' \
--quiet
{"_id","t","rid","ts","msg","u","groupable","_updatedAt","mentions","channels","drid","attachments","dcount","dlm","starred","reactions","replies","tcount","tlm","tmid":1}
This complete key list seems very useful but also messes up with the object hierarchy, indeed I expected "username" not to be part of a "mentions" object, but of a "u" object instead.
2. Schema-aware collection from Dump: As already mentioned further above, the mongodump operation produced some JSON metadata along with the BSON data records. The following command takes advantage of jq command line utility.
$ cat rocketchat_message.metadata.json | jq -c '.indexes[].key| keys' | tr -d '\n' | sed 's/\]\[/,/g' ["_id","_updatedAt","rid","ts","ts","u._id","editedAt","editedBy._id","rid","t","u._id","expireAt","_fts","_ftsx","file._id","mentions.username","pinned","snippeted","location","slackBotId","slackTs","unread","drid","tmid","tcount","tlm","navigation.token","pinnedBy._id","starred._id"]
Here we are facing the same issue as in first option.
3. Schema-aware document from DB: Although JSON-documents are self-describing, it is difficult to provide a document that is representative for all documents of a collection. It is possible that the collection has no document yet, that none of the existing documents is complete, or that the largest possible schema can be reverse-engineered only from the merge of multiple documents (ex. depending on a product category, the user could place a category-specific order).
$ mongo -u rocketchat -p changeme mongo.local:27017/rocketchat \ --eval "db.getCollection('rocketchat_message').findOne()" \ --quiet \ | sed 's/[A-Za-z]*(\([^)]*\))/\1/g' | tr '\n' ' ' \ > rocketchat_message_sample.json $ cat rocketchat_message_sample.json \ | jq -c '. | keys' ["_id","_updatedAt","groupable","msg","rid","t","ts","u"]
The sed command replaces all occurrences of MongoDB specific functions e.g. ISODate(), which are not JSON-valid. Note that MongoDB can deliver the schema in an even easier way:
$ mongo -u rocketchat -p changeme mongo.local:27017/rocketchat \ --eval "doc=db.rocketchat_message.findOne(); for (key in doc) print(key);" \ --quiet _id t rid ts msg u groupable _updatedAt
This looks good for the first level of hierarchy. But extending this script to all levels of hierarchy may require to recursively test for children, write to a different format or store the result to a special object in memory. At the end, the code might get hard to maintain and produce something else than what is actually expected. See also How to find Schema of a Collection in MongoDB.
4. Schema-aware document from Dump: The following command takes advantage of the bsondump utility in order to find the document with the highest number of keys.
# read last document, schema and number of elements $ bsondump rocketchat_message.bson | tail -1 {"_id":"u5fo4paTgR2zkqt7k","rid":"qCxWtTijs9mR3SgY5","tmid":"6PeavteZ6tyhgCsLD","msg":"cool","ts":{"$date":"2020-09-16T09:29:20.313Z"},"u":{"_id":"wHCH8we9aWS6qkWpo","username":"admin"},"mentions":[],"channels":[],"_updatedAt":{"$date":"2020-09-16T09:29:20.321Z"}} $ bsondump rocketchat_message.bson | tail -1 | jq 'select(objects)|=[.] | map( paths(scalars) | join(".") )' [ "_id", "rid", "tmid", "msg", "ts.$date", "u._id", "u.username", "_updatedAt.$date" ] $ bsondump rocketchat_message.bson --quiet | tail -1 | jq 'select(objects)|=[.] | map( paths(scalars) | join(".") ) | length' 8
# read all documents (provided that there are not too many) $ IFS=$'\n' $ my_array=( $(bsondump rocketchat_message.bson --quiet) ) $ echo ${#my_array[@]} 5
# read documents schema and number of elements, then keep largest one $ for line in ${my_array[@]}; do printf "$(echo $line | jq 'select(objects)|=[.] | map( paths(scalars) ) | map( map(select(numbers)="[]") | join(".") ) | length') $line\n"; done | sort -u | head -1 11 {"_id":"gTY7o3NeHXecLFyNy","t":"discussion-created","rid":"zs2TxG6SLQLAhh3Lb","ts":{"$date":"2020-09-16T09:28:41.087Z"},"msg":"my-discussion","u":{"_id":"wHCH8we9aWS6qkWpo","username":"admin"},"groupable":false,"drid":"qCxWtTijs9mR3SgY5","attachments":[],"_updatedAt":{"$date":"2020-09-16T09:29:20.337Z"},"dcount":2,"dlm":{"$date":"2020-09-16T09:29:20.313Z"}}
In this last example, we assumed that the largest document in size holds a merge of all other possible document schema.
Hierarchy flattening
Beside extracting the schema, there are two possible approaches for converting a JSON document structure into a flat/relational one.
- Maintain the original schema by just flattening the object hierarchy in large tables with minimal Primary/Foreign Key (PK/FK) constraints:
- message
- _id (pk)
- rid
- msg
- ts
- u__id (fk)
- u_username
- urls_0_url
- etc.
- Factorize the schema by detaching any nested table including maximal integrity constraints:
- message
- _id (pk)
- rid
- msg
- ts
- etc.
- message_user
- message_id (pk, fk)
- user_id (pk, fk)
- user
- _id (pk)
- username
- message_url
- message_id (pk, fk)
- message_url (pk, fk)
- url
- url (pk)
- etc.
We actually need both approaches: First one to turn hierarchical data into big tables for cleansing and masking the data. Second to create a de-normalized schema.
In the following part we will use Python together with the functional modules mentioned above, as well as the SqlAlchemy module and a SQLite database to build and test a custom schema repository.
## flatten.py # read input file import json with open(r'rocketchat_message_sample.json') as f: dt = json.load(f) # faltten dictionaty from flatten_json import flatten dic_flattened = flatten(dt) #dic_flattened = (flatten(d) for d in dt) # use pandas import pandas as pd df = pd.DataFrame(dic_flattened, index=[0]) print('-- DF COLUMNS:\n',df.columns) # create file db from sqlalchemy import create_engine engine = create_engine('sqlite:///db.sqlite.flat', echo=False) # create schema and write records to db df.to_sql('rocketchat_message', con=engine, if_exists='replace', index=False) print('-- DB SELECT:\n', engine.execute('SELECT * FROM rocketchat_message').fetchall())
Command:
$ pip install -r requirements.txt $ python flatten.py
Output:
-- DF COLUMNS: Index(['_id', 't', 'rid', 'ts', 'msg', 'u__id', 'u_username', 'groupable', '_updatedAt'], dtype='object') -- DB SELECT: [('MshE4AKDejGaiwJkv', 'uj', 'GENERAL', '2020-08-30T15:21:37.974Z', 'tncad', '3jh6AN3eLZKRCD6E9', 'tncad', 0, '2020-08-30T15:21:37.974Z')]
Note: The url table is ignored since there was no corresponding record in our sample.
From there you might want to run further checks using SQLite client.
$ sudo apt install sqlite3 $ sqlite3 db.sqlite.flat SQLite version 3.22.0 2018-01-22 18:45:57 Enter ".help" for usage hints. sqlite> .tables rocketchat_message sqlite> .schema rocketchat_message CREATE TABLE rocketchat_message ( _id TEXT, t TEXT, rid TEXT, ts TEXT, msg TEXT, u__id TEXT, u_username TEXT, groupable BOOLEAN, "_updatedAt" TEXT, CHECK (groupable IN (0, 1)) ); sqlite> .quit
Take-away
We have discussed what it takes to turn some MongoDB source dataset into a relational target schema.
We left the actual data modeling, mapping, transformation and automation up to a future post.
Especially, we will look into a data pipeline implementation that is production-ready. In other terms, it needs to be reliably operated (repeatable behavior, automated planning and scheduling, availability etc.) and maintained (audit-ability, error- and change management etc.).
Source code
Comments
Post a Comment