[postprocessor:metadata] add 'event' and 'filename' options

pull/1195/head
Mike Fährmann 4 years ago
parent 9c3568c397
commit ca59bd691c
No known key found for this signature in database
GPG Key ID: 5680CA389D365A88

@ -6,7 +6,7 @@
# it under the terms of the GNU General Public License version 2 as # it under the terms of the GNU General Public License version 2 as
# published by the Free Software Foundation. # published by the Free Software Foundation.
"""Write metadata to JSON files""" """Write metadata to external files"""
from .common import PostProcessor from .common import PostProcessor
from .. import util from .. import util
@ -24,7 +24,7 @@ class MetadataPP(PostProcessor):
cfmt = options.get("content-format") or options.get("format") cfmt = options.get("content-format") or options.get("format")
if isinstance(cfmt, list): if isinstance(cfmt, list):
cfmt = "\n".join(cfmt) + "\n" cfmt = "\n".join(cfmt) + "\n"
self.contentfmt = util.Formatter(cfmt).format_map self._content_fmt = util.Formatter(cfmt).format_map
ext = "txt" ext = "txt"
elif mode == "tags": elif mode == "tags":
self.write = self._write_tags self.write = self._write_tags
@ -39,47 +39,64 @@ class MetadataPP(PostProcessor):
if directory: if directory:
self._directory = self._directory_custom self._directory = self._directory_custom
sep = os.sep + (os.altsep or "") sep = os.sep + (os.altsep or "")
self.metadir = directory.rstrip(sep) + os.sep self._metadir = directory.rstrip(sep) + os.sep
filename = options.get("filename")
extfmt = options.get("extension-format") extfmt = options.get("extension-format")
if extfmt: if filename:
self._filename = self._filename_custom self._filename = self._filename_custom
self.extfmt = util.Formatter(extfmt).format_map self._filename_fmt = util.Formatter(filename).format_map
elif extfmt:
self._filename = self._filename_extfmt
self._extension_fmt = util.Formatter(extfmt).format_map
else: else:
self.extension = options.get("extension", ext) self.extension = options.get("extension", ext)
event = "metadata" if options.get("bypost") else "file" events = options.get("event")
job.hooks[event].append(self.run) if events is None:
events = ("file",)
elif isinstance(events, str):
events = events.split(",")
for event in events:
job.hooks[event].append(self.run)
def run(self, pathfmt): def run(self, pathfmt):
path = self._directory(pathfmt) + self._filename(pathfmt) directory = self._directory(pathfmt)
with open(path, "w", encoding="utf-8") as file: path = directory + self._filename(pathfmt)
self.write(file, pathfmt.kwdict)
try:
with open(path, "w", encoding="utf-8") as fp:
self.write(fp, pathfmt.kwdict)
except FileNotFoundError:
os.makedirs(directory, exist_ok=True)
with open(path, "w", encoding="utf-8") as fp:
self.write(fp, pathfmt.kwdict)
def _directory(self, pathfmt): def _directory(self, pathfmt):
return pathfmt.realdirectory return pathfmt.realdirectory
def _directory_custom(self, pathfmt): def _directory_custom(self, pathfmt):
directory = os.path.join(pathfmt.realdirectory, self.metadir) return os.path.join(pathfmt.realdirectory, self._metadir)
os.makedirs(directory, exist_ok=True)
return directory
def _filename(self, pathfmt): def _filename(self, pathfmt):
return pathfmt.filename + "." + self.extension return (pathfmt.filename or "metadata") + "." + self.extension
def _filename_custom(self, pathfmt): def _filename_custom(self, pathfmt):
return self._filename_fmt(pathfmt.kwdict)
def _filename_extfmt(self, pathfmt):
kwdict = pathfmt.kwdict kwdict = pathfmt.kwdict
ext = kwdict["extension"] ext = kwdict["extension"]
kwdict["extension"] = pathfmt.extension kwdict["extension"] = pathfmt.extension
kwdict["extension"] = pathfmt.prefix + self.extfmt(kwdict) kwdict["extension"] = pathfmt.prefix + self._extension_fmt(kwdict)
filename = pathfmt.build_filename() filename = pathfmt.build_filename()
kwdict["extension"] = ext kwdict["extension"] = ext
return filename return filename
def _write_custom(self, file, kwdict): def _write_custom(self, fp, kwdict):
file.write(self.contentfmt(kwdict)) fp.write(self._content_fmt(kwdict))
def _write_tags(self, file, kwdict): def _write_tags(self, fp, kwdict):
tags = kwdict.get("tags") or kwdict.get("tag_string") tags = kwdict.get("tags") or kwdict.get("tag_string")
if not tags: if not tags:
@ -91,11 +108,10 @@ class MetadataPP(PostProcessor):
taglist = tags.split(" ") taglist = tags.split(" ")
tags = taglist tags = taglist
file.write("\n".join(tags)) fp.write("\n".join(tags) + "\n")
file.write("\n")
def _write_json(self, file, kwdict): def _write_json(self, fp, kwdict):
util.dump_json(util.filter_dict(kwdict), file, self.ascii, self.indent) util.dump_json(util.filter_dict(kwdict), fp, self.ascii, self.indent)
__postprocessor__ = MetadataPP __postprocessor__ = MetadataPP

@ -244,7 +244,7 @@ class MetadataTest(BasePostprocessorTest):
pp = self._create(pp_info, {"foo": "bar"}) pp = self._create(pp_info, {"foo": "bar"})
self.assertEqual(pp.write, pp._write_custom) self.assertEqual(pp.write, pp._write_custom)
self.assertEqual(pp.extension, "txt") self.assertEqual(pp.extension, "txt")
self.assertTrue(pp.contentfmt) self.assertTrue(pp._content_fmt)
with patch("builtins.open", mock_open()) as m: with patch("builtins.open", mock_open()) as m:
self._trigger() self._trigger()
@ -261,7 +261,7 @@ class MetadataTest(BasePostprocessorTest):
"extension-format": "json", "extension-format": "json",
}) })
self.assertEqual(pp._filename, pp._filename_custom) self.assertEqual(pp._filename, pp._filename_extfmt)
with patch("builtins.open", mock_open()) as m: with patch("builtins.open", mock_open()) as m:
self._trigger() self._trigger()
@ -304,6 +304,18 @@ class MetadataTest(BasePostprocessorTest):
path = self.pathfmt.realdirectory + "metadata/file.json" path = self.pathfmt.realdirectory + "metadata/file.json"
m.assert_called_once_with(path, "w", encoding="utf-8") m.assert_called_once_with(path, "w", encoding="utf-8")
def test_metadata_filename(self):
self._create({
"filename" : "{category}_{filename}_meta.data",
"extension-format": "json",
})
with patch("builtins.open", mock_open()) as m:
self._trigger()
path = self.pathfmt.realdirectory + "test_file_meta.data"
m.assert_called_once_with(path, "w", encoding="utf-8")
@staticmethod @staticmethod
def _output(mock): def _output(mock):
return "".join( return "".join(

Loading…
Cancel
Save