Edit on GitHub

leaf_focus.pdf.xpdf

Text extraction from pdf using xpdf tools.

  1"""Text extraction from pdf using xpdf tools."""
  2
  3from __future__ import annotations
  4
  5import dataclasses
  6import json
  7import logging
  8import pathlib
  9import subprocess
 10
 11from datetime import datetime
 12
 13from beartype import beartype, typing
 14from defusedxml import ElementTree
 15
 16from leaf_focus import utils
 17from leaf_focus.pdf import model
 18
 19
 20logger = logging.getLogger(__name__)
 21
 22
 23@beartype
 24class XpdfProgram:
 25    """Interact with xpdf tools."""
 26
 27    OPTS_TEXT_ENCODING: tuple[str, str, str, str, str, str] = (
 28        "Latin1",
 29        "ASCII7",
 30        "Symbol",
 31        "ZapfDingbats",
 32        "UTF-8",
 33        "UCS-2",
 34    )
 35    OPTS_TEXT_LINE_ENDING: tuple[str, str, str] = ("unix", "dos", "mac")
 36    OPTS_IMAGE_ROTATION: tuple[int, int, int, int] = (0, 90, 180, 270)
 37    OPTS_IMAGE_FREETYPE: tuple[str, str] = ("yes", "no")
 38    OPTS_IMAGE_ANTI_ALIAS: tuple[str, str] = ("yes", "no")
 39    OPTS_IMAGE_VEC_ANTI_ALIAS: tuple[str, str] = ("yes", "no")
 40
 41    def __init__(self, directory: pathlib.Path) -> None:
 42        """Create a new xpdf program class to interact with xpdf tools.
 43
 44        Args:
 45            directory: The path to the directory containing xpdf tools.
 46        """
 47        self._directory = directory
 48
 49    def info(
 50        self,
 51        pdf_path: pathlib.Path,
 52        output_dir: pathlib.Path,
 53        xpdf_args: model.XpdfInfoArgs,
 54    ) -> model.XpdfInfoResult:
 55        """Get information from a pdf file.
 56
 57        Args:
 58            pdf_path: The path to the pdf file.
 59            output_dir: The directory to save pdf info file.
 60            xpdf_args: The program arguments.
 61
 62        Returns:
 63            The pdf file information.
 64        """
 65        # validation
 66        enc = xpdf_args.encoding
 67        utils.validate("text encoding", enc, self.OPTS_TEXT_ENCODING)
 68
 69        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
 70
 71        if not pdf_path.exists():
 72            msg = f"Pdf file not found '{pdf_path}'."
 73            raise utils.LeafFocusError(msg) from FileNotFoundError(pdf_path)
 74
 75        output_file = utils.output_root(pdf_path, "info", output_dir)
 76        output_file = output_file.with_suffix(".json")
 77
 78        if output_file.exists():
 79            logger.info("Loading existing pdf info file.")
 80            with pathlib.Path.open(output_file, encoding="utf-8") as info_file:
 81                data = json.load(info_file)
 82                data["creation_date"] = utils.parse_date(data.get("creation_date"))
 83                data["modification_date"] = utils.parse_date(
 84                    data.get("modification_date")
 85                )
 86                return model.XpdfInfoResult(**data)
 87
 88        logger.info("Extracting pdf info and saving to file.")
 89
 90        # build command
 91        exe_path = utils.select_exe(self._directory / "pdfinfo")
 92        cmd = [str(exe_path)]
 93
 94        cmd_args = self.build_cmd(xpdf_args)
 95
 96        cmd.extend(cmd_args)
 97        cmd.append(str(pdf_path.resolve()))
 98
 99        # execute program
100        result = subprocess.run(  # noqa: S603
101            cmd,
102            capture_output=True,
103            check=True,
104            timeout=30,
105            text=True,
106        )
107        lines = result.stdout.splitlines()
108
109        metadata_line_index, data = self.build_field_metadata(
110            pdf_path,
111            lines,
112        )
113
114        # metadata
115        if metadata_line_index is not None:
116            start = metadata_line_index + 1
117            metadata = "\n".join(lines[start:])
118            root = ElementTree.fromstring(metadata)
119            data["metadata"] = utils.xml_to_element(root).to_dict()
120
121        if output_dir and output_dir.exists():
122            logger.debug("Saving pdf info to '%s'.", output_file)
123            output_file.write_text(
124                json.dumps(data, indent=2, cls=utils.CustomJsonEncoder),
125            )
126
127        return model.XpdfInfoResult(**data)
128
129    def text(
130        self,
131        pdf_path: pathlib.Path,
132        output_path: pathlib.Path,
133        xpdf_args: model.XpdfTextArgs,
134    ) -> model.XpdfTextResult:
135        """Get the text from a pdf file.
136
137        Args:
138            pdf_path: The path to the pdf file.
139            output_path: The directory to save output files.
140            xpdf_args: The pdf program arguments.
141
142        Returns:
143            The result from running the text extraction program.
144        """
145        # validation
146        eol = xpdf_args.line_end_type
147        utils.validate("end of line", eol, self.OPTS_TEXT_LINE_ENDING)
148
149        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
150
151        if not pdf_path.exists():
152            msg = f"Pdf file not found '{pdf_path}'."
153            raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path))
154
155        # build command
156
157        cmd_args = self.build_cmd(xpdf_args)
158
159        output_file = utils.output_root(pdf_path, "output", output_path, cmd_args)
160        output_file = output_file.with_suffix(".txt")
161
162        # check if embedded text file already exists
163        if output_file.exists():
164            logger.info("Loading extracted embedded text from existing file.")
165            return model.XpdfTextResult(
166                stdout=[],
167                stderr=[],
168                output_path=output_file,
169            )
170
171        if logger.isEnabledFor(logging.DEBUG):
172            logger.debug("Did not find expected output file '%s'", output_file.name)
173            logger.debug("Listing items in '%s'", output_file.parent)
174            item_count = 0
175            for item in output_file.parent.iterdir():
176                item_count += 1
177                logger.debug("Found item '%s'", item)
178            logger.debug("Found %s items in dir.", item_count)
179
180        logger.info("Extracting pdf embedded text and saving to file.")
181
182        exe_path = utils.select_exe(self._directory / "pdftotext")
183
184        cmd = [str(exe_path)]
185
186        cmd.extend([*cmd_args, str(pdf_path), str(output_file)])
187
188        # execute program
189        result = subprocess.run(  # noqa: S603
190            cmd,
191            capture_output=True,
192            check=True,
193            timeout=30,
194            text=True,
195        )
196
197        logger.debug("Saving pdf embedded text to '%s'.", output_file)
198
199        return model.XpdfTextResult(
200            stdout=(result.stdout or "").splitlines(),
201            stderr=(result.stderr or "").splitlines(),
202            output_path=output_file,
203        )
204
205    def image(
206        self,
207        pdf_path: pathlib.Path,
208        output_path: pathlib.Path,
209        xpdf_args: model.XpdfImageArgs,
210    ) -> model.XpdfImageResult:
211        """Create images of pdf pages.
212
213        Args:
214            pdf_path: The path to the pdf file.
215            output_path: The directory to save output files.
216            xpdf_args: The program arguments.
217
218        Returns:
219            The  pdf file image info.
220        """
221        # validation
222        rot = xpdf_args.rotation
223        utils.validate("rotation", rot, self.OPTS_IMAGE_ROTATION)
224
225        free_type = xpdf_args.free_type
226        utils.validate("freetype", free_type, self.OPTS_IMAGE_FREETYPE)
227
228        anti_alias = xpdf_args.anti_aliasing
229        utils.validate("anti-aliasing", anti_alias, self.OPTS_IMAGE_ANTI_ALIAS)
230
231        anti_alias_vec = xpdf_args.anti_aliasing
232        utils.validate(
233            "vector anti-aliasing",
234            anti_alias_vec,
235            self.OPTS_IMAGE_VEC_ANTI_ALIAS,
236        )
237
238        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
239
240        if not pdf_path.exists():
241            msg = f"Pdf file not found '{pdf_path}'."
242            raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path))
243
244        logger.info("Saving each pdf page as an image.")
245
246        # build command
247        cmd_args = self.build_cmd(xpdf_args)
248
249        output_type = "page-image"
250
251        # don't include the page limits when building the output prefix
252        xpdf_args.first_page = None
253        xpdf_args.last_page = None
254        output_cmd_args = self.build_cmd(xpdf_args)
255        output_dir = utils.output_root(
256            pdf_path,
257            output_type,
258            output_path,
259            output_cmd_args,
260        )
261
262        for pdf_image_file in output_dir.parent.iterdir():
263            if not pdf_image_file.name.startswith(output_dir.name):
264                continue
265
266            logger.info("Found existing pdf images.")
267
268            output_files = self.find_images(output_dir)
269            return model.XpdfImageResult(
270                stdout=[],
271                stderr=[],
272                output_dir=output_dir,
273                output_files=output_files,
274            )
275
276        exe_path = utils.select_exe(self._directory / "pdftopng")
277        cmd = [str(exe_path)]
278
279        cmd.extend([*cmd_args, str(pdf_path), str(output_dir)])
280
281        # execute program
282        result = subprocess.run(  # noqa: S603
283            cmd,
284            capture_output=True,
285            check=True,
286            timeout=30,
287            text=True,
288        )
289
290        logger.debug("Created pdf page images using prefix '%s'.", output_dir)
291
292        output_files = self.find_images(output_dir)
293
294        return model.XpdfImageResult(
295            stdout=(result.stdout or "").splitlines(),
296            stderr=(result.stderr or "").splitlines(),
297            output_dir=output_dir,
298            output_files=output_files,
299        )
300
301    def build_cmd(self, tool_args: model.XpdfArgs) -> list[str]:
302        """Build the command arguments from a data class."""
303        arg_class = tool_args.__class__
304        cmd_args = []
305        for field in dataclasses.fields(arg_class):
306            name = field.name
307            value = getattr(tool_args, name)
308
309            field_default = field.default
310
311            # TODO: account for default_factory
312
313            # validate the arg config
314            cmd_key = field.metadata.get("leaf_focus", {}).get("cmd")
315            if not cmd_key:
316                msg = f"Args incorrectly configured: missing 'cmd' for '{name}'."
317                raise ValueError(msg)
318
319            cmd_type = field.metadata.get("leaf_focus", {}).get("cmd_type")
320            if not cmd_type:
321                msg = f"Args incorrectly configured: missing 'cmd_type' for '{name}'."
322                raise ValueError(msg)
323
324            # add the arg
325            if cmd_type == "bool":
326                if value is not None and value is not True and value is not False:
327                    msg = (
328                        f"Argument '{name}' must be None, True, or False, "
329                        f"not '{value}'."
330                    )
331                    raise ValueError(msg)
332
333                if value is True:
334                    cmd_args.extend([str(cmd_key)])
335
336            elif cmd_type == "single":
337                if (
338                    field_default is None
339                    and value is not None
340                    or field_default != value
341                ):
342                    cmd_args.extend([str(cmd_key), str(value)])
343                else:
344                    # no need to add cmd
345                    pass
346            else:
347                msg = (
348                    f"Argument '{name}' has unknown cmd_type '{cmd_type}'. "
349                    "Expected one of 'bool, single'."
350                )
351                raise ValueError(msg)
352
353        return cmd_args
354
355    def find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]:
356        """Find image files in a directory."""
357        stem_parts = 7
358        stem_digit_parts = 6
359        output_files = []
360        for file_path in output_dir.parent.iterdir():
361            if not file_path.is_file():
362                continue
363            if not file_path.name.startswith(output_dir.stem):
364                continue
365            if file_path.suffix != ".png":
366                continue
367            if len(file_path.stem) < stem_parts:
368                continue
369            if file_path.stem[-stem_parts] != "-":
370                continue
371            if not all(i.isdigit() for i in file_path.stem[-stem_digit_parts:]):
372                continue
373            output_files.append(file_path)
374
375        if not output_files:
376            logger.warning("No page images found.")
377
378        return output_files
379
380    def build_field_metadata(
381        self,
382        pdf_path: pathlib.Path,
383        lines: typing.Iterable[str],
384    ) -> tuple[int | None, dict[str, typing.Any]]:
385        """Build metadata for a field."""
386        fields_map = {
387            field.metadata.get("leaf_focus", {}).get("name"): field
388            for field in dataclasses.fields(model.XpdfInfoResult)
389        }
390        metadata_line_index: int | None = None
391
392        data: dict[str, typing.Any] = {i.name: None for i in fields_map.values()}
393        for index, line in enumerate(lines):
394            if line.startswith("Metadata:"):
395                metadata_line_index = index
396                break
397
398            value: typing.Any = None
399            key, value = line.split(":", maxsplit=1)
400            key = key.strip()
401
402            field = fields_map.get(key)
403            if not field:
404                msg = f"Unknown pdf info key '{key}' value '{value}' in '{pdf_path}'."
405                raise utils.LeafFocusError(msg)
406
407            data_key = field.name
408            if data.get(data_key) is not None:
409                msg = f"Duplicate pdf info key '{key}' in '{pdf_path}'."
410                raise utils.LeafFocusError(msg)
411
412            typing_arg = typing.get_args(field.type)
413            types_str = [str, "str", "str | None"]
414            types_bool = [bool, "bool", "bool | None"]
415            types_int = [int, "int", "int | None"]
416            types_datetime = [datetime, "datetime", "datetime | None"]
417
418            if field.type in types_str or str in typing_arg:
419                value = value.strip()
420            elif field.type in types_datetime or datetime in typing_arg:
421                value = utils.parse_date(value.strip())
422            elif field.type in types_bool or bool in typing_arg:
423                value = value.strip().lower() == "yes"
424            elif field.type in types_int or int in typing_arg:
425                if data_key == "file_size_bytes":
426                    value = value.replace(" bytes", "")
427                value = int(value.strip().lower())
428            else:
429                msg = f"Unknown key '{key}' type '{field.type}'"
430                raise ValueError(msg)
431
432            data[data_key] = value
433
434        return metadata_line_index, data
logger = <Logger leaf_focus.pdf.xpdf (WARNING)>
@beartype
class XpdfProgram:
 24@beartype
 25class XpdfProgram:
 26    """Interact with xpdf tools."""
 27
 28    OPTS_TEXT_ENCODING: tuple[str, str, str, str, str, str] = (
 29        "Latin1",
 30        "ASCII7",
 31        "Symbol",
 32        "ZapfDingbats",
 33        "UTF-8",
 34        "UCS-2",
 35    )
 36    OPTS_TEXT_LINE_ENDING: tuple[str, str, str] = ("unix", "dos", "mac")
 37    OPTS_IMAGE_ROTATION: tuple[int, int, int, int] = (0, 90, 180, 270)
 38    OPTS_IMAGE_FREETYPE: tuple[str, str] = ("yes", "no")
 39    OPTS_IMAGE_ANTI_ALIAS: tuple[str, str] = ("yes", "no")
 40    OPTS_IMAGE_VEC_ANTI_ALIAS: tuple[str, str] = ("yes", "no")
 41
 42    def __init__(self, directory: pathlib.Path) -> None:
 43        """Create a new xpdf program class to interact with xpdf tools.
 44
 45        Args:
 46            directory: The path to the directory containing xpdf tools.
 47        """
 48        self._directory = directory
 49
 50    def info(
 51        self,
 52        pdf_path: pathlib.Path,
 53        output_dir: pathlib.Path,
 54        xpdf_args: model.XpdfInfoArgs,
 55    ) -> model.XpdfInfoResult:
 56        """Get information from a pdf file.
 57
 58        Args:
 59            pdf_path: The path to the pdf file.
 60            output_dir: The directory to save pdf info file.
 61            xpdf_args: The program arguments.
 62
 63        Returns:
 64            The pdf file information.
 65        """
 66        # validation
 67        enc = xpdf_args.encoding
 68        utils.validate("text encoding", enc, self.OPTS_TEXT_ENCODING)
 69
 70        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
 71
 72        if not pdf_path.exists():
 73            msg = f"Pdf file not found '{pdf_path}'."
 74            raise utils.LeafFocusError(msg) from FileNotFoundError(pdf_path)
 75
 76        output_file = utils.output_root(pdf_path, "info", output_dir)
 77        output_file = output_file.with_suffix(".json")
 78
 79        if output_file.exists():
 80            logger.info("Loading existing pdf info file.")
 81            with pathlib.Path.open(output_file, encoding="utf-8") as info_file:
 82                data = json.load(info_file)
 83                data["creation_date"] = utils.parse_date(data.get("creation_date"))
 84                data["modification_date"] = utils.parse_date(
 85                    data.get("modification_date")
 86                )
 87                return model.XpdfInfoResult(**data)
 88
 89        logger.info("Extracting pdf info and saving to file.")
 90
 91        # build command
 92        exe_path = utils.select_exe(self._directory / "pdfinfo")
 93        cmd = [str(exe_path)]
 94
 95        cmd_args = self.build_cmd(xpdf_args)
 96
 97        cmd.extend(cmd_args)
 98        cmd.append(str(pdf_path.resolve()))
 99
100        # execute program
101        result = subprocess.run(  # noqa: S603
102            cmd,
103            capture_output=True,
104            check=True,
105            timeout=30,
106            text=True,
107        )
108        lines = result.stdout.splitlines()
109
110        metadata_line_index, data = self.build_field_metadata(
111            pdf_path,
112            lines,
113        )
114
115        # metadata
116        if metadata_line_index is not None:
117            start = metadata_line_index + 1
118            metadata = "\n".join(lines[start:])
119            root = ElementTree.fromstring(metadata)
120            data["metadata"] = utils.xml_to_element(root).to_dict()
121
122        if output_dir and output_dir.exists():
123            logger.debug("Saving pdf info to '%s'.", output_file)
124            output_file.write_text(
125                json.dumps(data, indent=2, cls=utils.CustomJsonEncoder),
126            )
127
128        return model.XpdfInfoResult(**data)
129
130    def text(
131        self,
132        pdf_path: pathlib.Path,
133        output_path: pathlib.Path,
134        xpdf_args: model.XpdfTextArgs,
135    ) -> model.XpdfTextResult:
136        """Get the text from a pdf file.
137
138        Args:
139            pdf_path: The path to the pdf file.
140            output_path: The directory to save output files.
141            xpdf_args: The pdf program arguments.
142
143        Returns:
144            The result from running the text extraction program.
145        """
146        # validation
147        eol = xpdf_args.line_end_type
148        utils.validate("end of line", eol, self.OPTS_TEXT_LINE_ENDING)
149
150        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
151
152        if not pdf_path.exists():
153            msg = f"Pdf file not found '{pdf_path}'."
154            raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path))
155
156        # build command
157
158        cmd_args = self.build_cmd(xpdf_args)
159
160        output_file = utils.output_root(pdf_path, "output", output_path, cmd_args)
161        output_file = output_file.with_suffix(".txt")
162
163        # check if embedded text file already exists
164        if output_file.exists():
165            logger.info("Loading extracted embedded text from existing file.")
166            return model.XpdfTextResult(
167                stdout=[],
168                stderr=[],
169                output_path=output_file,
170            )
171
172        if logger.isEnabledFor(logging.DEBUG):
173            logger.debug("Did not find expected output file '%s'", output_file.name)
174            logger.debug("Listing items in '%s'", output_file.parent)
175            item_count = 0
176            for item in output_file.parent.iterdir():
177                item_count += 1
178                logger.debug("Found item '%s'", item)
179            logger.debug("Found %s items in dir.", item_count)
180
181        logger.info("Extracting pdf embedded text and saving to file.")
182
183        exe_path = utils.select_exe(self._directory / "pdftotext")
184
185        cmd = [str(exe_path)]
186
187        cmd.extend([*cmd_args, str(pdf_path), str(output_file)])
188
189        # execute program
190        result = subprocess.run(  # noqa: S603
191            cmd,
192            capture_output=True,
193            check=True,
194            timeout=30,
195            text=True,
196        )
197
198        logger.debug("Saving pdf embedded text to '%s'.", output_file)
199
200        return model.XpdfTextResult(
201            stdout=(result.stdout or "").splitlines(),
202            stderr=(result.stderr or "").splitlines(),
203            output_path=output_file,
204        )
205
206    def image(
207        self,
208        pdf_path: pathlib.Path,
209        output_path: pathlib.Path,
210        xpdf_args: model.XpdfImageArgs,
211    ) -> model.XpdfImageResult:
212        """Create images of pdf pages.
213
214        Args:
215            pdf_path: The path to the pdf file.
216            output_path: The directory to save output files.
217            xpdf_args: The program arguments.
218
219        Returns:
220            The  pdf file image info.
221        """
222        # validation
223        rot = xpdf_args.rotation
224        utils.validate("rotation", rot, self.OPTS_IMAGE_ROTATION)
225
226        free_type = xpdf_args.free_type
227        utils.validate("freetype", free_type, self.OPTS_IMAGE_FREETYPE)
228
229        anti_alias = xpdf_args.anti_aliasing
230        utils.validate("anti-aliasing", anti_alias, self.OPTS_IMAGE_ANTI_ALIAS)
231
232        anti_alias_vec = xpdf_args.anti_aliasing
233        utils.validate(
234            "vector anti-aliasing",
235            anti_alias_vec,
236            self.OPTS_IMAGE_VEC_ANTI_ALIAS,
237        )
238
239        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
240
241        if not pdf_path.exists():
242            msg = f"Pdf file not found '{pdf_path}'."
243            raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path))
244
245        logger.info("Saving each pdf page as an image.")
246
247        # build command
248        cmd_args = self.build_cmd(xpdf_args)
249
250        output_type = "page-image"
251
252        # don't include the page limits when building the output prefix
253        xpdf_args.first_page = None
254        xpdf_args.last_page = None
255        output_cmd_args = self.build_cmd(xpdf_args)
256        output_dir = utils.output_root(
257            pdf_path,
258            output_type,
259            output_path,
260            output_cmd_args,
261        )
262
263        for pdf_image_file in output_dir.parent.iterdir():
264            if not pdf_image_file.name.startswith(output_dir.name):
265                continue
266
267            logger.info("Found existing pdf images.")
268
269            output_files = self.find_images(output_dir)
270            return model.XpdfImageResult(
271                stdout=[],
272                stderr=[],
273                output_dir=output_dir,
274                output_files=output_files,
275            )
276
277        exe_path = utils.select_exe(self._directory / "pdftopng")
278        cmd = [str(exe_path)]
279
280        cmd.extend([*cmd_args, str(pdf_path), str(output_dir)])
281
282        # execute program
283        result = subprocess.run(  # noqa: S603
284            cmd,
285            capture_output=True,
286            check=True,
287            timeout=30,
288            text=True,
289        )
290
291        logger.debug("Created pdf page images using prefix '%s'.", output_dir)
292
293        output_files = self.find_images(output_dir)
294
295        return model.XpdfImageResult(
296            stdout=(result.stdout or "").splitlines(),
297            stderr=(result.stderr or "").splitlines(),
298            output_dir=output_dir,
299            output_files=output_files,
300        )
301
302    def build_cmd(self, tool_args: model.XpdfArgs) -> list[str]:
303        """Build the command arguments from a data class."""
304        arg_class = tool_args.__class__
305        cmd_args = []
306        for field in dataclasses.fields(arg_class):
307            name = field.name
308            value = getattr(tool_args, name)
309
310            field_default = field.default
311
312            # TODO: account for default_factory
313
314            # validate the arg config
315            cmd_key = field.metadata.get("leaf_focus", {}).get("cmd")
316            if not cmd_key:
317                msg = f"Args incorrectly configured: missing 'cmd' for '{name}'."
318                raise ValueError(msg)
319
320            cmd_type = field.metadata.get("leaf_focus", {}).get("cmd_type")
321            if not cmd_type:
322                msg = f"Args incorrectly configured: missing 'cmd_type' for '{name}'."
323                raise ValueError(msg)
324
325            # add the arg
326            if cmd_type == "bool":
327                if value is not None and value is not True and value is not False:
328                    msg = (
329                        f"Argument '{name}' must be None, True, or False, "
330                        f"not '{value}'."
331                    )
332                    raise ValueError(msg)
333
334                if value is True:
335                    cmd_args.extend([str(cmd_key)])
336
337            elif cmd_type == "single":
338                if (
339                    field_default is None
340                    and value is not None
341                    or field_default != value
342                ):
343                    cmd_args.extend([str(cmd_key), str(value)])
344                else:
345                    # no need to add cmd
346                    pass
347            else:
348                msg = (
349                    f"Argument '{name}' has unknown cmd_type '{cmd_type}'. "
350                    "Expected one of 'bool, single'."
351                )
352                raise ValueError(msg)
353
354        return cmd_args
355
356    def find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]:
357        """Find image files in a directory."""
358        stem_parts = 7
359        stem_digit_parts = 6
360        output_files = []
361        for file_path in output_dir.parent.iterdir():
362            if not file_path.is_file():
363                continue
364            if not file_path.name.startswith(output_dir.stem):
365                continue
366            if file_path.suffix != ".png":
367                continue
368            if len(file_path.stem) < stem_parts:
369                continue
370            if file_path.stem[-stem_parts] != "-":
371                continue
372            if not all(i.isdigit() for i in file_path.stem[-stem_digit_parts:]):
373                continue
374            output_files.append(file_path)
375
376        if not output_files:
377            logger.warning("No page images found.")
378
379        return output_files
380
381    def build_field_metadata(
382        self,
383        pdf_path: pathlib.Path,
384        lines: typing.Iterable[str],
385    ) -> tuple[int | None, dict[str, typing.Any]]:
386        """Build metadata for a field."""
387        fields_map = {
388            field.metadata.get("leaf_focus", {}).get("name"): field
389            for field in dataclasses.fields(model.XpdfInfoResult)
390        }
391        metadata_line_index: int | None = None
392
393        data: dict[str, typing.Any] = {i.name: None for i in fields_map.values()}
394        for index, line in enumerate(lines):
395            if line.startswith("Metadata:"):
396                metadata_line_index = index
397                break
398
399            value: typing.Any = None
400            key, value = line.split(":", maxsplit=1)
401            key = key.strip()
402
403            field = fields_map.get(key)
404            if not field:
405                msg = f"Unknown pdf info key '{key}' value '{value}' in '{pdf_path}'."
406                raise utils.LeafFocusError(msg)
407
408            data_key = field.name
409            if data.get(data_key) is not None:
410                msg = f"Duplicate pdf info key '{key}' in '{pdf_path}'."
411                raise utils.LeafFocusError(msg)
412
413            typing_arg = typing.get_args(field.type)
414            types_str = [str, "str", "str | None"]
415            types_bool = [bool, "bool", "bool | None"]
416            types_int = [int, "int", "int | None"]
417            types_datetime = [datetime, "datetime", "datetime | None"]
418
419            if field.type in types_str or str in typing_arg:
420                value = value.strip()
421            elif field.type in types_datetime or datetime in typing_arg:
422                value = utils.parse_date(value.strip())
423            elif field.type in types_bool or bool in typing_arg:
424                value = value.strip().lower() == "yes"
425            elif field.type in types_int or int in typing_arg:
426                if data_key == "file_size_bytes":
427                    value = value.replace(" bytes", "")
428                value = int(value.strip().lower())
429            else:
430                msg = f"Unknown key '{key}' type '{field.type}'"
431                raise ValueError(msg)
432
433            data[data_key] = value
434
435        return metadata_line_index, data

Interact with xpdf tools.

XpdfProgram(directory: pathlib.Path)
42    def __init__(self, directory: pathlib.Path) -> None:
43        """Create a new xpdf program class to interact with xpdf tools.
44
45        Args:
46            directory: The path to the directory containing xpdf tools.
47        """
48        self._directory = directory

Create a new xpdf program class to interact with xpdf tools.

Arguments:
  • directory: The path to the directory containing xpdf tools.
OPTS_TEXT_ENCODING: tuple[str, str, str, str, str, str] = ('Latin1', 'ASCII7', 'Symbol', 'ZapfDingbats', 'UTF-8', 'UCS-2')
OPTS_TEXT_LINE_ENDING: tuple[str, str, str] = ('unix', 'dos', 'mac')
OPTS_IMAGE_ROTATION: tuple[int, int, int, int] = (0, 90, 180, 270)
OPTS_IMAGE_FREETYPE: tuple[str, str] = ('yes', 'no')
OPTS_IMAGE_ANTI_ALIAS: tuple[str, str] = ('yes', 'no')
OPTS_IMAGE_VEC_ANTI_ALIAS: tuple[str, str] = ('yes', 'no')
def info( self, pdf_path: pathlib.Path, output_dir: pathlib.Path, xpdf_args: leaf_focus.pdf.model.XpdfInfoArgs) -> leaf_focus.pdf.model.XpdfInfoResult:
 50    def info(
 51        self,
 52        pdf_path: pathlib.Path,
 53        output_dir: pathlib.Path,
 54        xpdf_args: model.XpdfInfoArgs,
 55    ) -> model.XpdfInfoResult:
 56        """Get information from a pdf file.
 57
 58        Args:
 59            pdf_path: The path to the pdf file.
 60            output_dir: The directory to save pdf info file.
 61            xpdf_args: The program arguments.
 62
 63        Returns:
 64            The pdf file information.
 65        """
 66        # validation
 67        enc = xpdf_args.encoding
 68        utils.validate("text encoding", enc, self.OPTS_TEXT_ENCODING)
 69
 70        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
 71
 72        if not pdf_path.exists():
 73            msg = f"Pdf file not found '{pdf_path}'."
 74            raise utils.LeafFocusError(msg) from FileNotFoundError(pdf_path)
 75
 76        output_file = utils.output_root(pdf_path, "info", output_dir)
 77        output_file = output_file.with_suffix(".json")
 78
 79        if output_file.exists():
 80            logger.info("Loading existing pdf info file.")
 81            with pathlib.Path.open(output_file, encoding="utf-8") as info_file:
 82                data = json.load(info_file)
 83                data["creation_date"] = utils.parse_date(data.get("creation_date"))
 84                data["modification_date"] = utils.parse_date(
 85                    data.get("modification_date")
 86                )
 87                return model.XpdfInfoResult(**data)
 88
 89        logger.info("Extracting pdf info and saving to file.")
 90
 91        # build command
 92        exe_path = utils.select_exe(self._directory / "pdfinfo")
 93        cmd = [str(exe_path)]
 94
 95        cmd_args = self.build_cmd(xpdf_args)
 96
 97        cmd.extend(cmd_args)
 98        cmd.append(str(pdf_path.resolve()))
 99
100        # execute program
101        result = subprocess.run(  # noqa: S603
102            cmd,
103            capture_output=True,
104            check=True,
105            timeout=30,
106            text=True,
107        )
108        lines = result.stdout.splitlines()
109
110        metadata_line_index, data = self.build_field_metadata(
111            pdf_path,
112            lines,
113        )
114
115        # metadata
116        if metadata_line_index is not None:
117            start = metadata_line_index + 1
118            metadata = "\n".join(lines[start:])
119            root = ElementTree.fromstring(metadata)
120            data["metadata"] = utils.xml_to_element(root).to_dict()
121
122        if output_dir and output_dir.exists():
123            logger.debug("Saving pdf info to '%s'.", output_file)
124            output_file.write_text(
125                json.dumps(data, indent=2, cls=utils.CustomJsonEncoder),
126            )
127
128        return model.XpdfInfoResult(**data)

Get information from a pdf file.

Arguments:
  • pdf_path: The path to the pdf file.
  • output_dir: The directory to save pdf info file.
  • xpdf_args: The program arguments.
Returns:

The pdf file information.

def text( self, pdf_path: pathlib.Path, output_path: pathlib.Path, xpdf_args: leaf_focus.pdf.model.XpdfTextArgs) -> leaf_focus.pdf.model.XpdfTextResult:
130    def text(
131        self,
132        pdf_path: pathlib.Path,
133        output_path: pathlib.Path,
134        xpdf_args: model.XpdfTextArgs,
135    ) -> model.XpdfTextResult:
136        """Get the text from a pdf file.
137
138        Args:
139            pdf_path: The path to the pdf file.
140            output_path: The directory to save output files.
141            xpdf_args: The pdf program arguments.
142
143        Returns:
144            The result from running the text extraction program.
145        """
146        # validation
147        eol = xpdf_args.line_end_type
148        utils.validate("end of line", eol, self.OPTS_TEXT_LINE_ENDING)
149
150        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
151
152        if not pdf_path.exists():
153            msg = f"Pdf file not found '{pdf_path}'."
154            raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path))
155
156        # build command
157
158        cmd_args = self.build_cmd(xpdf_args)
159
160        output_file = utils.output_root(pdf_path, "output", output_path, cmd_args)
161        output_file = output_file.with_suffix(".txt")
162
163        # check if embedded text file already exists
164        if output_file.exists():
165            logger.info("Loading extracted embedded text from existing file.")
166            return model.XpdfTextResult(
167                stdout=[],
168                stderr=[],
169                output_path=output_file,
170            )
171
172        if logger.isEnabledFor(logging.DEBUG):
173            logger.debug("Did not find expected output file '%s'", output_file.name)
174            logger.debug("Listing items in '%s'", output_file.parent)
175            item_count = 0
176            for item in output_file.parent.iterdir():
177                item_count += 1
178                logger.debug("Found item '%s'", item)
179            logger.debug("Found %s items in dir.", item_count)
180
181        logger.info("Extracting pdf embedded text and saving to file.")
182
183        exe_path = utils.select_exe(self._directory / "pdftotext")
184
185        cmd = [str(exe_path)]
186
187        cmd.extend([*cmd_args, str(pdf_path), str(output_file)])
188
189        # execute program
190        result = subprocess.run(  # noqa: S603
191            cmd,
192            capture_output=True,
193            check=True,
194            timeout=30,
195            text=True,
196        )
197
198        logger.debug("Saving pdf embedded text to '%s'.", output_file)
199
200        return model.XpdfTextResult(
201            stdout=(result.stdout or "").splitlines(),
202            stderr=(result.stderr or "").splitlines(),
203            output_path=output_file,
204        )

Get the text from a pdf file.

Arguments:
  • pdf_path: The path to the pdf file.
  • output_path: The directory to save output files.
  • xpdf_args: The pdf program arguments.
Returns:

The result from running the text extraction program.

def image( self, pdf_path: pathlib.Path, output_path: pathlib.Path, xpdf_args: leaf_focus.pdf.model.XpdfImageArgs) -> leaf_focus.pdf.model.XpdfImageResult:
206    def image(
207        self,
208        pdf_path: pathlib.Path,
209        output_path: pathlib.Path,
210        xpdf_args: model.XpdfImageArgs,
211    ) -> model.XpdfImageResult:
212        """Create images of pdf pages.
213
214        Args:
215            pdf_path: The path to the pdf file.
216            output_path: The directory to save output files.
217            xpdf_args: The program arguments.
218
219        Returns:
220            The  pdf file image info.
221        """
222        # validation
223        rot = xpdf_args.rotation
224        utils.validate("rotation", rot, self.OPTS_IMAGE_ROTATION)
225
226        free_type = xpdf_args.free_type
227        utils.validate("freetype", free_type, self.OPTS_IMAGE_FREETYPE)
228
229        anti_alias = xpdf_args.anti_aliasing
230        utils.validate("anti-aliasing", anti_alias, self.OPTS_IMAGE_ANTI_ALIAS)
231
232        anti_alias_vec = xpdf_args.anti_aliasing
233        utils.validate(
234            "vector anti-aliasing",
235            anti_alias_vec,
236            self.OPTS_IMAGE_VEC_ANTI_ALIAS,
237        )
238
239        utils.validate_pages(xpdf_args.first_page, xpdf_args.last_page)
240
241        if not pdf_path.exists():
242            msg = f"Pdf file not found '{pdf_path}'."
243            raise utils.LeafFocusError(msg) from FileNotFoundError(str(pdf_path))
244
245        logger.info("Saving each pdf page as an image.")
246
247        # build command
248        cmd_args = self.build_cmd(xpdf_args)
249
250        output_type = "page-image"
251
252        # don't include the page limits when building the output prefix
253        xpdf_args.first_page = None
254        xpdf_args.last_page = None
255        output_cmd_args = self.build_cmd(xpdf_args)
256        output_dir = utils.output_root(
257            pdf_path,
258            output_type,
259            output_path,
260            output_cmd_args,
261        )
262
263        for pdf_image_file in output_dir.parent.iterdir():
264            if not pdf_image_file.name.startswith(output_dir.name):
265                continue
266
267            logger.info("Found existing pdf images.")
268
269            output_files = self.find_images(output_dir)
270            return model.XpdfImageResult(
271                stdout=[],
272                stderr=[],
273                output_dir=output_dir,
274                output_files=output_files,
275            )
276
277        exe_path = utils.select_exe(self._directory / "pdftopng")
278        cmd = [str(exe_path)]
279
280        cmd.extend([*cmd_args, str(pdf_path), str(output_dir)])
281
282        # execute program
283        result = subprocess.run(  # noqa: S603
284            cmd,
285            capture_output=True,
286            check=True,
287            timeout=30,
288            text=True,
289        )
290
291        logger.debug("Created pdf page images using prefix '%s'.", output_dir)
292
293        output_files = self.find_images(output_dir)
294
295        return model.XpdfImageResult(
296            stdout=(result.stdout or "").splitlines(),
297            stderr=(result.stderr or "").splitlines(),
298            output_dir=output_dir,
299            output_files=output_files,
300        )

Create images of pdf pages.

Arguments:
  • pdf_path: The path to the pdf file.
  • output_path: The directory to save output files.
  • xpdf_args: The program arguments.
Returns:

The pdf file image info.

def build_cmd(self, tool_args: leaf_focus.pdf.model.XpdfArgs) -> list[str]:
302    def build_cmd(self, tool_args: model.XpdfArgs) -> list[str]:
303        """Build the command arguments from a data class."""
304        arg_class = tool_args.__class__
305        cmd_args = []
306        for field in dataclasses.fields(arg_class):
307            name = field.name
308            value = getattr(tool_args, name)
309
310            field_default = field.default
311
312            # TODO: account for default_factory
313
314            # validate the arg config
315            cmd_key = field.metadata.get("leaf_focus", {}).get("cmd")
316            if not cmd_key:
317                msg = f"Args incorrectly configured: missing 'cmd' for '{name}'."
318                raise ValueError(msg)
319
320            cmd_type = field.metadata.get("leaf_focus", {}).get("cmd_type")
321            if not cmd_type:
322                msg = f"Args incorrectly configured: missing 'cmd_type' for '{name}'."
323                raise ValueError(msg)
324
325            # add the arg
326            if cmd_type == "bool":
327                if value is not None and value is not True and value is not False:
328                    msg = (
329                        f"Argument '{name}' must be None, True, or False, "
330                        f"not '{value}'."
331                    )
332                    raise ValueError(msg)
333
334                if value is True:
335                    cmd_args.extend([str(cmd_key)])
336
337            elif cmd_type == "single":
338                if (
339                    field_default is None
340                    and value is not None
341                    or field_default != value
342                ):
343                    cmd_args.extend([str(cmd_key), str(value)])
344                else:
345                    # no need to add cmd
346                    pass
347            else:
348                msg = (
349                    f"Argument '{name}' has unknown cmd_type '{cmd_type}'. "
350                    "Expected one of 'bool, single'."
351                )
352                raise ValueError(msg)
353
354        return cmd_args

Build the command arguments from a data class.

def find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]:
356    def find_images(self, output_dir: pathlib.Path) -> list[pathlib.Path]:
357        """Find image files in a directory."""
358        stem_parts = 7
359        stem_digit_parts = 6
360        output_files = []
361        for file_path in output_dir.parent.iterdir():
362            if not file_path.is_file():
363                continue
364            if not file_path.name.startswith(output_dir.stem):
365                continue
366            if file_path.suffix != ".png":
367                continue
368            if len(file_path.stem) < stem_parts:
369                continue
370            if file_path.stem[-stem_parts] != "-":
371                continue
372            if not all(i.isdigit() for i in file_path.stem[-stem_digit_parts:]):
373                continue
374            output_files.append(file_path)
375
376        if not output_files:
377            logger.warning("No page images found.")
378
379        return output_files

Find image files in a directory.

def build_field_metadata( self, pdf_path: pathlib.Path, lines: Iterable[str]) -> tuple[int | None, dict[str, typing.Any]]:
381    def build_field_metadata(
382        self,
383        pdf_path: pathlib.Path,
384        lines: typing.Iterable[str],
385    ) -> tuple[int | None, dict[str, typing.Any]]:
386        """Build metadata for a field."""
387        fields_map = {
388            field.metadata.get("leaf_focus", {}).get("name"): field
389            for field in dataclasses.fields(model.XpdfInfoResult)
390        }
391        metadata_line_index: int | None = None
392
393        data: dict[str, typing.Any] = {i.name: None for i in fields_map.values()}
394        for index, line in enumerate(lines):
395            if line.startswith("Metadata:"):
396                metadata_line_index = index
397                break
398
399            value: typing.Any = None
400            key, value = line.split(":", maxsplit=1)
401            key = key.strip()
402
403            field = fields_map.get(key)
404            if not field:
405                msg = f"Unknown pdf info key '{key}' value '{value}' in '{pdf_path}'."
406                raise utils.LeafFocusError(msg)
407
408            data_key = field.name
409            if data.get(data_key) is not None:
410                msg = f"Duplicate pdf info key '{key}' in '{pdf_path}'."
411                raise utils.LeafFocusError(msg)
412
413            typing_arg = typing.get_args(field.type)
414            types_str = [str, "str", "str | None"]
415            types_bool = [bool, "bool", "bool | None"]
416            types_int = [int, "int", "int | None"]
417            types_datetime = [datetime, "datetime", "datetime | None"]
418
419            if field.type in types_str or str in typing_arg:
420                value = value.strip()
421            elif field.type in types_datetime or datetime in typing_arg:
422                value = utils.parse_date(value.strip())
423            elif field.type in types_bool or bool in typing_arg:
424                value = value.strip().lower() == "yes"
425            elif field.type in types_int or int in typing_arg:
426                if data_key == "file_size_bytes":
427                    value = value.replace(" bytes", "")
428                value = int(value.strip().lower())
429            else:
430                msg = f"Unknown key '{key}' type '{field.type}'"
431                raise ValueError(msg)
432
433            data[data_key] = value
434
435        return metadata_line_index, data

Build metadata for a field.