The MySQL connector is a SQL connector that queries a MySQL database and maps rows to claims using a column mapping dictionary.
fetch() to yield claim dicts; the base class run() handles batching and ingestionpymysql with DictCursor for named column accessmysql:{query} with source type database_import
The mapping dict tells the connector which columns correspond to claim fields.
At minimum you need subject, predicate, and object keys.
Optionally include confidence to map a numeric column to claim confidence.
| Mapping key | Required | Description |
|---|---|---|
subject | Yes | Column name for subject entity |
predicate | Yes | Column name for predicate/relation |
object | Yes | Column name for object entity |
confidence | No | Column name for numeric confidence value |
The MySQL connector requires the pymysql package for database access.
$ pip install pymysql
You need the hostname, username, password, and database name for your MySQL server.
The default port is 3306. Ensure your user has SELECT privileges
on the tables you plan to query.
Write a SQL query that returns columns for subject, predicate, and object.
The connector runs the query and maps each row to a claim using your mapping dict.
conn = db.connect("mysql", host="localhost", user="root", password="secret", database="mydb", query="SELECT src, rel, dst FROM edges", mapping={"subject": "src", "predicate": "rel", "object": "dst"}, ) result = conn.run(db)
| Parameter | Required | Default | Description |
|---|---|---|---|
host |
Yes | — | MySQL server hostname |
user |
Yes | — | Database username |
password |
Yes | — | Database password |
database |
Yes | — | Database name |
query |
Yes | — | SQL query to execute |
mapping |
Yes | — | Column-to-claim field mapping dict |
port |
No | 3306 |
MySQL port |
source_type |
No | database_import |
Provenance source type |
save |
No | False |
Encrypt and persist credentials |
conn = db.connect("mysql", host="localhost", user="root", password="secret", database="mydb", query="SELECT src, rel, dst FROM edges", mapping={"subject": "src", "predicate": "rel", "object": "dst"}, ) result = conn.run(db)
conn = db.connect("mysql", dsn="mysql://user:pass@host/db", query="SELECT gene, relation, target FROM interactions", mapping={"subject": "gene", "predicate": "relation", "object": "target"}, ) result = conn.run(db)
conn = db.connect("mysql", host="db.example.com", user="reader", password=os.environ["MYSQL_PASS"], database="prod", port=3307, query="SELECT a, b, c FROM data", mapping={"subject": "a", "predicate": "b", "object": "c"}, save=True, ) result = conn.run(db)