February 14, 2023

A Guide to Listing Files and Directories with (Py)Spark, or How To Summon the Beast

Different methods for traversing file-systems with PySpark. From the mundane to the occult.

A Guide to Listing Files and Directories with (Py)Spark, or How To Summon the Beast
Prepare for some black magic

Frequently in data engineering there arises the need to get a listing of files from a file-system so those paths can be used as input for further processing. Most reader functions in Spark accept lists of higher level directories, with or without wildcards. However, if you are using a schema, this does constrain the data to adhere to this schema. In other words: All the Parquet files in a single read need to have the same DDL.

Sometimes it is unknown beforehand what the data entails, or it is know to be heterogeneous, but it needs to be restructured and/or repartitioned. In those scenarios having a list of paths to loop over is very convenient. This article details several ways into creating such lists, each with its own trade-offs.

The last version will require some black magic, so make sure to bring a goat and pentacle.

Don't Use Spark

File-systems that are locally mounted at the driver node can be easily-accessed using tools like Python's os.listdir() , or the glob package. And within the JVM space there is the java.nio.file package.

If bigger traversals are needed, within Python it will be tempting to use something like os.walk(), but be aware that if the underlying file-system is distributed, like ADLS2 or S3, then recursively traversing a directory structure can add up greatly in terms of round-trip time. If this is not an issue (e.g. it's a shallow tree), then this is by far the cleanest option.

HDFS Using Only The PySpark Driver

Moving towards Spark also means using Hadoop's FileSystem API with which both ADLS2 and S3 are compatible. If you are on Databricks, then you can consider the closed-source dbutils package, available in both Python or Scala. This can give some trouble with mypy and linters, but there are ways around that.

If you are using Python and PySpark, then  it is good to know that the HDFS API is actually also available to you directly using the Py4J bridge that wraps PySpark's SparkSession around the JVM's SparkSession. For the remainder of this post keep in mind that these are two separate objects.

First let's assume there is some data on a distributed file-system that we want to scan using a HDFS glob pattern. Note: This is not completely compatible with Python glob patterns, e.g. with the the latter curly braces are not supported.

import os

base_path = "/mnt/my_data/"
glob_pattern = "*"

The end-goal here is to call FileSystem.globStatus() So a FileSystem object and its dependencies need to be obtained on the JVM. The general syntax for interacting with objects on the JVM via Py4J is to use the canonical class name. Let's see how that works from the SparkContext by creating an URI object:

jvm = spark.sparkContext._jvm
fs_root = jvm.java.net.URI.create(base_path)

First the reference to the JVM bridge is assigned to a separate variable for readabilty. Then the URI object is obtained by calling create(). This will be the root of our file-system. Then we get a reference to Spark's HadoopConfiguration. Notice the _jsc member, which is the SparkContext on the JVM:

conf = spark.sparkContext._jsc.hadoopConfiguration()

These two ingredients allow us to get a FileSystem object (for other recipes see the API docs).

fs = jvm.org.apache.hadoop.fs.FileSystem.get(fs_root, conf)

Finally we can scan the file-system and get a list of FileStatus objects.

path_glob = jvm.org.apache.hadoop.fs.Path(os.path.join(base_path, glob_pattern))
status_list = hadoop_file_system.globStatus(path_glob)

for status in status_list:
	raw_path = status.getPath().toUri().getRawPath()
	if status.isDirectory:
    	print(f"{raw_path} is a directory")
    else:
    	print(f"{raw_path} is a file")

Putting it all together yields the following snippet:

import os

base_path = "/mnt/my_data/"
glob_pattern = "*"

jvm = spark.sparkContext._jvm
fs_root = jvm.java.net.URI.create(base_path)

conf = spark.sparkContext._jsc.hadoopConfiguration()
fs = jvm.org.apache.hadoop.fs.FileSystem.get(fs_root, conf)

path_glob = jvm.org.apache.hadoop.fs.Path(os.path.join(base_path, glob_pattern))
status_list = hadoop_file_system.globStatus(path_glob)

for status in status_list:
	raw_path = status.getPath().toUri().getRawPath()
	if status.isDirectory:
    	print(f"{raw_path} is a directory")
    else:
    	print(f"{raw_path} is a file")

Distributed The Scala Way

So far the code above will only execute on a single node, most likely the driver. For big path traversals, this can take a long time, with many consecutive round-trips. Furthermore, with a big cluster at one's disposal it would be much nicer to use all the nodes in the cluster for obtaining the list of paths. For this Databricks posted an example on their Knowledge Base which uses the InMemoryFileIndex:

How to list and delete files faster in Databricks
Scenario Suppose you need to delete a table that is partitioned by year, month, date, region, and service. However, the table is huge, and there will be ar

Distributed The Python Way

Doing a distributed file-system scan in Python is, unfortunately, not so straightforward. In some cases you can use a notebook to mix Scala cells with Python cells, but (assuming you are not a savage) you will not put that in production.

A cleaner way would be to customize the Scala code, turn it into a JAR and incorporate it in your pipeline. The result can be written to the file-system itself (e.g. file_list.txt) and picked up by PySpark code in subsequent stages.

Intuitively, if one read the section above, then another thing to try would be to use the InMemoryFileIndex.bulkListLeafFiles method via Py4J. But, alas, this is not available because the method is package private, meaning it can only be accessed accessed from members that are also in the org.apache.spark.sql package. Which (apparently) Scala Databricks notebooks are, but PySpark notebooks are obviously not!

As user Lou Zell points out, it is tricky to get around this limitation and get the "incantation" of the required constructor-call right:

How to list files in S3 bucket using Spark Session?
Is it possible to list all of the files in given S3 path (ex: s3://my-bucket/my-folder/*.extension) using a SparkSession object?

So, without further ado: Get the goat and pentacles ready and let's summon a Scala object through Java's Reflection API in Python!

The main goal here is to create an instance of InMemoryFileIndex and call its listLeafFiles method. Ergo, we will not use the bulkListLeafFiles method of its companion object directly. The listLeafFiles method will call it for us.

The snippets below assume that the variables defined above are in scope for brevity. We start with calling the constructor of InMemoryFileIndex for which we firstly need a Scala (not Java!) sequence of paths. Luckily, Spark provides a utility for converting a Python list into a Scala Seq:

hadoop_base_path = jvm.org.apache.hadoop.fs.Path(base_path)
paths = jvm.PythonUtils.toSeq([hadoop_base_path])

Then there is a choice to be made: You can feed the index a NoopCache, which will save memory, but will scan the file-system for every query, including an immediate scan when the object is created. The alternative is using the FileStatusCache.

Below both options are displayed, with the NoopCache first. Notice the dollar sign at the end of the canonical name. This is how Scala objects are translated to the JVM. To access a Scala object (e.g. the companion object of a case class) from outside of Scala this is how that object can be obtained: By appending a dollar sign to the end of the canonical name and accessing its MODULE$ field through Java's Reflection API.

noop_cache_clazz = jvm.java.lang.Class.forName("org.apache.spark.sql.execution.datasources.NoopCache$")
ff = noop_cache_clazz.getDeclaredField("MODULE$")
cache = ff.get(None)

For the FileStatusCache we will do a similar trick, but then go one step further by calling the getOrCreate method and passing it the JVM's (!) SparkSession

file_status_cache_clazz = jvm.java.lang.Class.forName(
            "org.apache.spark.sql.execution.datasources.FileStatusCache$"
        )
ff = file_status_cache_clazz.getDeclaredField("MODULE$")
jvm_spark_session = spark._jsparkSession
cache = ff.get(None).getOrCreate(jvm_spark_session)

With this we have all the dependencies to summon the beast in Python. Let's create an InMemoryFileIndex:

in_memory_file_index = sc._jvm.org.apache.spark.sql.execution.datasources.InMemoryFileIndex(
    jvm_spark_session,
    paths,
    jvm.PythonUtils.toScalaMap({}),
    jvm.scala.Option.empty(),
    cache,  
    jvm.scala.Option.empty(),
    jvm.scala.Option.empty()
)

And let it do our bidding:

glob_path = jvm.org.apache.hadoop.fs.Path(os.path.join(base_path, glob_pattern))
glob_paths = jvm.PythonUtils.toSeq([glob_path])
status_list = in_memory_file_index.listLeafFiles(glob_paths)

Of course, the beast never does entirely what was asked. The variable status_list is not converted into a Python list automatically, so we need to do this conversion ourselves:

path_list = []
iter = status_list.iterator()
while iter.hasNext():
    path_status = iter.next()
    path_list.append(str(path_status.getPath().toUri().getRawPath()))

Putting all the components together into a single script is left as an exercise to the reader.

Discussion

This post started rather simple and ended rather complex. Obviously, it's best to KISS, but to play devil's advocate: The whole of PySpark is built around the Py4J bridge. It seems unlikely that this API will change anytime soon. So wrapping all of this in a separate module or class and using distributed (!) path scanning in Python, might just be the devil's bargain that brings your project forward.

Comments or feedback?

You can curse or bless me via the links below, as long as you keep your goats:

Twitter

LinkedIn

HackerNews