commit
d4ed3a35b8
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,42 @@
|
|||||||
|
defmodule Mix.Tasks.Pleroma.Docs do
|
||||||
|
use Mix.Task
|
||||||
|
import Mix.Pleroma
|
||||||
|
|
||||||
|
@shortdoc "Generates docs from descriptions.exs"
|
||||||
|
@moduledoc """
|
||||||
|
Generates docs from `descriptions.exs`.
|
||||||
|
|
||||||
|
Supports two formats: `markdown` and `json`.
|
||||||
|
|
||||||
|
## Generate Markdown docs
|
||||||
|
|
||||||
|
`mix pleroma.docs`
|
||||||
|
|
||||||
|
## Generate JSON docs
|
||||||
|
|
||||||
|
`mix pleroma.docs json`
|
||||||
|
"""
|
||||||
|
|
||||||
|
def run(["json"]) do
|
||||||
|
do_run(Pleroma.Docs.JSON)
|
||||||
|
end
|
||||||
|
|
||||||
|
def run(_) do
|
||||||
|
do_run(Pleroma.Docs.Markdown)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_run(implementation) do
|
||||||
|
start_pleroma()
|
||||||
|
|
||||||
|
with {descriptions, _paths} <- Mix.Config.eval!("config/description.exs"),
|
||||||
|
{:ok, file_path} <-
|
||||||
|
Pleroma.Docs.Generator.process(
|
||||||
|
implementation,
|
||||||
|
descriptions[:pleroma][:config_description]
|
||||||
|
) do
|
||||||
|
type = if implementation == Pleroma.Docs.Markdown, do: "Markdown", else: "JSON"
|
||||||
|
|
||||||
|
Mix.shell().info([:green, "#{type} docs successfully generated to #{file_path}."])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,63 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Activity.Ir.Topics do
|
||||||
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.Web.ActivityPub.Visibility
|
||||||
|
|
||||||
|
def get_activity_topics(activity) do
|
||||||
|
activity
|
||||||
|
|> Object.normalize()
|
||||||
|
|> generate_topics(activity)
|
||||||
|
|> List.flatten()
|
||||||
|
end
|
||||||
|
|
||||||
|
defp generate_topics(%{data: %{"type" => "Answer"}}, _) do
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
|
||||||
|
defp generate_topics(object, activity) do
|
||||||
|
["user", "list"] ++ visibility_tags(object, activity)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp visibility_tags(object, activity) do
|
||||||
|
case Visibility.get_visibility(activity) do
|
||||||
|
"public" ->
|
||||||
|
if activity.local do
|
||||||
|
["public", "public:local"]
|
||||||
|
else
|
||||||
|
["public"]
|
||||||
|
end
|
||||||
|
|> item_creation_tags(object, activity)
|
||||||
|
|
||||||
|
"direct" ->
|
||||||
|
["direct"]
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
[]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp item_creation_tags(tags, %{data: %{"type" => "Create"}} = object, activity) do
|
||||||
|
tags ++ hashtags_to_topics(object) ++ attachment_topics(object, activity)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp item_creation_tags(tags, _, _) do
|
||||||
|
tags
|
||||||
|
end
|
||||||
|
|
||||||
|
defp hashtags_to_topics(%{data: %{"tag" => tags}}) do
|
||||||
|
tags
|
||||||
|
|> Enum.filter(&is_bitstring(&1))
|
||||||
|
|> Enum.map(fn tag -> "hashtag:" <> tag end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp hashtags_to_topics(_), do: []
|
||||||
|
|
||||||
|
defp attachment_topics(%{data: %{"attachment" => []}}, _act), do: []
|
||||||
|
|
||||||
|
defp attachment_topics(_object, %{local: true}), do: ["public:media", "public:local:media"]
|
||||||
|
|
||||||
|
defp attachment_topics(_object, _act), do: ["public:media"]
|
||||||
|
end
|
@ -0,0 +1,51 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Delivery do
|
||||||
|
use Ecto.Schema
|
||||||
|
|
||||||
|
alias Pleroma.Delivery
|
||||||
|
alias Pleroma.FlakeId
|
||||||
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.Repo
|
||||||
|
alias Pleroma.User
|
||||||
|
alias Pleroma.User
|
||||||
|
|
||||||
|
import Ecto.Changeset
|
||||||
|
import Ecto.Query
|
||||||
|
|
||||||
|
schema "deliveries" do
|
||||||
|
belongs_to(:user, User, type: FlakeId)
|
||||||
|
belongs_to(:object, Object)
|
||||||
|
end
|
||||||
|
|
||||||
|
def changeset(delivery, params \\ %{}) do
|
||||||
|
delivery
|
||||||
|
|> cast(params, [:user_id, :object_id])
|
||||||
|
|> validate_required([:user_id, :object_id])
|
||||||
|
|> foreign_key_constraint(:object_id)
|
||||||
|
|> foreign_key_constraint(:user_id)
|
||||||
|
|> unique_constraint(:user_id, name: :deliveries_user_id_object_id_index)
|
||||||
|
end
|
||||||
|
|
||||||
|
def create(object_id, user_id) do
|
||||||
|
%Delivery{}
|
||||||
|
|> changeset(%{user_id: user_id, object_id: object_id})
|
||||||
|
|> Repo.insert(on_conflict: :nothing)
|
||||||
|
end
|
||||||
|
|
||||||
|
def get(object_id, user_id) do
|
||||||
|
from(d in Delivery, where: d.user_id == ^user_id and d.object_id == ^object_id)
|
||||||
|
|> Repo.one()
|
||||||
|
end
|
||||||
|
|
||||||
|
# A hack because user delete activities have a fake id for whatever reason
|
||||||
|
# TODO: Get rid of this
|
||||||
|
def delete_all_by_object_id("pleroma:fake_object_id"), do: {0, []}
|
||||||
|
|
||||||
|
def delete_all_by_object_id(object_id) do
|
||||||
|
from(d in Delivery, where: d.object_id == ^object_id)
|
||||||
|
|> Repo.delete_all()
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,73 @@
|
|||||||
|
defmodule Pleroma.Docs.Generator do
|
||||||
|
@callback process(keyword()) :: {:ok, String.t()}
|
||||||
|
|
||||||
|
@spec process(module(), keyword()) :: {:ok, String.t()}
|
||||||
|
def process(implementation, descriptions) do
|
||||||
|
implementation.process(descriptions)
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec uploaders_list() :: [module()]
|
||||||
|
def uploaders_list do
|
||||||
|
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||||
|
|
||||||
|
Enum.filter(modules, fn module ->
|
||||||
|
name_as_list = Module.split(module)
|
||||||
|
|
||||||
|
List.starts_with?(name_as_list, ["Pleroma", "Uploaders"]) and
|
||||||
|
List.last(name_as_list) != "Uploader"
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec filters_list() :: [module()]
|
||||||
|
def filters_list do
|
||||||
|
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||||
|
|
||||||
|
Enum.filter(modules, fn module ->
|
||||||
|
name_as_list = Module.split(module)
|
||||||
|
|
||||||
|
List.starts_with?(name_as_list, ["Pleroma", "Upload", "Filter"])
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec mrf_list() :: [module()]
|
||||||
|
def mrf_list do
|
||||||
|
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||||
|
|
||||||
|
Enum.filter(modules, fn module ->
|
||||||
|
name_as_list = Module.split(module)
|
||||||
|
|
||||||
|
List.starts_with?(name_as_list, ["Pleroma", "Web", "ActivityPub", "MRF"]) and
|
||||||
|
length(name_as_list) > 4
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec richmedia_parsers() :: [module()]
|
||||||
|
def richmedia_parsers do
|
||||||
|
{:ok, modules} = :application.get_key(:pleroma, :modules)
|
||||||
|
|
||||||
|
Enum.filter(modules, fn module ->
|
||||||
|
name_as_list = Module.split(module)
|
||||||
|
|
||||||
|
List.starts_with?(name_as_list, ["Pleroma", "Web", "RichMedia", "Parsers"]) and
|
||||||
|
length(name_as_list) == 5
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defimpl Jason.Encoder, for: Tuple do
|
||||||
|
def encode(tuple, opts) do
|
||||||
|
Jason.Encode.list(Tuple.to_list(tuple), opts)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defimpl Jason.Encoder, for: [Regex, Function] do
|
||||||
|
def encode(term, opts) do
|
||||||
|
Jason.Encode.string(inspect(term), opts)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defimpl String.Chars, for: Regex do
|
||||||
|
def to_string(term) do
|
||||||
|
inspect(term)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,20 @@
|
|||||||
|
defmodule Pleroma.Docs.JSON do
|
||||||
|
@behaviour Pleroma.Docs.Generator
|
||||||
|
|
||||||
|
@spec process(keyword()) :: {:ok, String.t()}
|
||||||
|
def process(descriptions) do
|
||||||
|
config_path = "docs/generate_config.json"
|
||||||
|
|
||||||
|
with {:ok, file} <- File.open(config_path, [:write]),
|
||||||
|
json <- generate_json(descriptions),
|
||||||
|
:ok <- IO.write(file, json),
|
||||||
|
:ok <- File.close(file) do
|
||||||
|
{:ok, config_path}
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec generate_json([keyword()]) :: String.t()
|
||||||
|
def generate_json(descriptions) do
|
||||||
|
Jason.encode!(descriptions)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,88 @@
|
|||||||
|
defmodule Pleroma.Docs.Markdown do
|
||||||
|
@behaviour Pleroma.Docs.Generator
|
||||||
|
|
||||||
|
@spec process(keyword()) :: {:ok, String.t()}
|
||||||
|
def process(descriptions) do
|
||||||
|
config_path = "docs/generated_config.md"
|
||||||
|
{:ok, file} = File.open(config_path, [:utf8, :write])
|
||||||
|
IO.write(file, "# Generated configuration\n")
|
||||||
|
IO.write(file, "Date of generation: #{Date.utc_today()}\n\n")
|
||||||
|
|
||||||
|
IO.write(
|
||||||
|
file,
|
||||||
|
"This file describe the configuration, it is recommended to edit the relevant `*.secret.exs` file instead of the others founds in the ``config`` directory.\n\n" <>
|
||||||
|
"If you run Pleroma with ``MIX_ENV=prod`` the file is ``prod.secret.exs``, otherwise it is ``dev.secret.exs``.\n\n"
|
||||||
|
)
|
||||||
|
|
||||||
|
for group <- descriptions do
|
||||||
|
if is_nil(group[:key]) do
|
||||||
|
IO.write(file, "## #{inspect(group[:group])}\n")
|
||||||
|
else
|
||||||
|
IO.write(file, "## #{inspect(group[:key])}\n")
|
||||||
|
end
|
||||||
|
|
||||||
|
IO.write(file, "#{group[:description]}\n")
|
||||||
|
|
||||||
|
for child <- group[:children] || [] do
|
||||||
|
print_child_header(file, child)
|
||||||
|
|
||||||
|
print_suggestions(file, child[:suggestions])
|
||||||
|
|
||||||
|
if child[:children] do
|
||||||
|
for subchild <- child[:children] do
|
||||||
|
print_child_header(file, subchild)
|
||||||
|
|
||||||
|
print_suggestions(file, subchild[:suggestions])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
IO.write(file, "\n")
|
||||||
|
end
|
||||||
|
|
||||||
|
:ok = File.close(file)
|
||||||
|
{:ok, config_path}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp print_child_header(file, %{key: key, type: type, description: description} = _child) do
|
||||||
|
IO.write(
|
||||||
|
file,
|
||||||
|
"- `#{inspect(key)}` (`#{inspect(type)}`): #{description} \n"
|
||||||
|
)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp print_child_header(file, %{key: key, type: type} = _child) do
|
||||||
|
IO.write(file, "- `#{inspect(key)}` (`#{inspect(type)}`) \n")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp print_suggestion(file, suggestion) when is_list(suggestion) do
|
||||||
|
IO.write(file, " `#{inspect(suggestion)}`\n")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp print_suggestion(file, suggestion) when is_function(suggestion) do
|
||||||
|
IO.write(file, " `#{inspect(suggestion.())}`\n")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp print_suggestion(file, suggestion, as_list \\ false) do
|
||||||
|
list_mark = if as_list, do: "- ", else: ""
|
||||||
|
IO.write(file, " #{list_mark}`#{inspect(suggestion)}`\n")
|
||||||
|
end
|
||||||
|
|
||||||
|
defp print_suggestions(_file, nil), do: nil
|
||||||
|
|
||||||
|
defp print_suggestions(_file, ""), do: nil
|
||||||
|
|
||||||
|
defp print_suggestions(file, suggestions) do
|
||||||
|
if length(suggestions) > 1 do
|
||||||
|
IO.write(file, "Suggestions:\n")
|
||||||
|
|
||||||
|
for suggestion <- suggestions do
|
||||||
|
print_suggestion(file, suggestion, true)
|
||||||
|
end
|
||||||
|
else
|
||||||
|
IO.write(file, " Suggestion: ")
|
||||||
|
|
||||||
|
print_suggestion(file, List.first(suggestions))
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,7 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Scheduler do
|
||||||
|
use Quantum.Scheduler, otp_app: :pleroma
|
||||||
|
end
|
@ -1,239 +0,0 @@
|
|||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.Web.Federator.RetryQueue do
|
|
||||||
use GenServer
|
|
||||||
|
|
||||||
require Logger
|
|
||||||
|
|
||||||
def init(args) do
|
|
||||||
queue_table = :ets.new(:pleroma_retry_queue, [:bag, :protected])
|
|
||||||
|
|
||||||
{:ok, %{args | queue_table: queue_table, running_jobs: :sets.new()}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def start_link(_) do
|
|
||||||
enabled =
|
|
||||||
if Pleroma.Config.get(:env) == :test,
|
|
||||||
do: true,
|
|
||||||
else: Pleroma.Config.get([__MODULE__, :enabled], false)
|
|
||||||
|
|
||||||
if enabled do
|
|
||||||
Logger.info("Starting retry queue")
|
|
||||||
|
|
||||||
linkres =
|
|
||||||
GenServer.start_link(
|
|
||||||
__MODULE__,
|
|
||||||
%{delivered: 0, dropped: 0, queue_table: nil, running_jobs: nil},
|
|
||||||
name: __MODULE__
|
|
||||||
)
|
|
||||||
|
|
||||||
maybe_kickoff_timer()
|
|
||||||
linkres
|
|
||||||
else
|
|
||||||
Logger.info("Retry queue disabled")
|
|
||||||
:ignore
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def enqueue(data, transport, retries \\ 0) do
|
|
||||||
GenServer.cast(__MODULE__, {:maybe_enqueue, data, transport, retries + 1})
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_stats do
|
|
||||||
GenServer.call(__MODULE__, :get_stats)
|
|
||||||
end
|
|
||||||
|
|
||||||
def reset_stats do
|
|
||||||
GenServer.call(__MODULE__, :reset_stats)
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_retry_params(retries) do
|
|
||||||
if retries > Pleroma.Config.get([__MODULE__, :max_retries]) do
|
|
||||||
{:drop, "Max retries reached"}
|
|
||||||
else
|
|
||||||
{:retry, growth_function(retries)}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def get_retry_timer_interval do
|
|
||||||
Pleroma.Config.get([:retry_queue, :interval], 1000)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp ets_count_expires(table, current_time) do
|
|
||||||
:ets.select_count(
|
|
||||||
table,
|
|
||||||
[
|
|
||||||
{
|
|
||||||
{:"$1", :"$2"},
|
|
||||||
[{:"=<", :"$1", {:const, current_time}}],
|
|
||||||
[true]
|
|
||||||
}
|
|
||||||
]
|
|
||||||
)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp ets_pop_n_expired(table, current_time, desired) do
|
|
||||||
{popped, _continuation} =
|
|
||||||
:ets.select(
|
|
||||||
table,
|
|
||||||
[
|
|
||||||
{
|
|
||||||
{:"$1", :"$2"},
|
|
||||||
[{:"=<", :"$1", {:const, current_time}}],
|
|
||||||
[:"$_"]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
desired
|
|
||||||
)
|
|
||||||
|
|
||||||
popped
|
|
||||||
|> Enum.each(fn e ->
|
|
||||||
:ets.delete_object(table, e)
|
|
||||||
end)
|
|
||||||
|
|
||||||
popped
|
|
||||||
end
|
|
||||||
|
|
||||||
def maybe_start_job(running_jobs, queue_table) do
|
|
||||||
# we don't want to hit the ets or the DateTime more times than we have to
|
|
||||||
# could optimize slightly further by not using the count, and instead grabbing
|
|
||||||
# up to N objects early...
|
|
||||||
current_time = DateTime.to_unix(DateTime.utc_now())
|
|
||||||
n_running_jobs = :sets.size(running_jobs)
|
|
||||||
|
|
||||||
if n_running_jobs < Pleroma.Config.get([__MODULE__, :max_jobs]) do
|
|
||||||
n_ready_jobs = ets_count_expires(queue_table, current_time)
|
|
||||||
|
|
||||||
if n_ready_jobs > 0 do
|
|
||||||
# figure out how many we could start
|
|
||||||
available_job_slots = Pleroma.Config.get([__MODULE__, :max_jobs]) - n_running_jobs
|
|
||||||
start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
|
|
||||||
else
|
|
||||||
running_jobs
|
|
||||||
end
|
|
||||||
else
|
|
||||||
running_jobs
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp start_n_jobs(running_jobs, _queue_table, _current_time, 0) do
|
|
||||||
running_jobs
|
|
||||||
end
|
|
||||||
|
|
||||||
defp start_n_jobs(running_jobs, queue_table, current_time, available_job_slots)
|
|
||||||
when available_job_slots > 0 do
|
|
||||||
candidates = ets_pop_n_expired(queue_table, current_time, available_job_slots)
|
|
||||||
|
|
||||||
candidates
|
|
||||||
|> List.foldl(running_jobs, fn {_, e}, rj ->
|
|
||||||
{:ok, pid} = Task.start(fn -> worker(e) end)
|
|
||||||
mref = Process.monitor(pid)
|
|
||||||
:sets.add_element(mref, rj)
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def worker({:send, data, transport, retries}) do
|
|
||||||
case transport.publish_one(data) do
|
|
||||||
{:ok, _} ->
|
|
||||||
GenServer.cast(__MODULE__, :inc_delivered)
|
|
||||||
:delivered
|
|
||||||
|
|
||||||
{:error, _reason} ->
|
|
||||||
enqueue(data, transport, retries)
|
|
||||||
:retry
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_call(:get_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
|
|
||||||
{:reply, %{delivered: delivery_count, dropped: drop_count}, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_call(:reset_stats, _from, %{delivered: delivery_count, dropped: drop_count} = state) do
|
|
||||||
{:reply, %{delivered: delivery_count, dropped: drop_count},
|
|
||||||
%{state | delivered: 0, dropped: 0}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:reset_stats, state) do
|
|
||||||
{:noreply, %{state | delivered: 0, dropped: 0}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(
|
|
||||||
{:maybe_enqueue, data, transport, retries},
|
|
||||||
%{dropped: drop_count, queue_table: queue_table, running_jobs: running_jobs} = state
|
|
||||||
) do
|
|
||||||
case get_retry_params(retries) do
|
|
||||||
{:retry, timeout} ->
|
|
||||||
:ets.insert(queue_table, {timeout, {:send, data, transport, retries}})
|
|
||||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
|
||||||
{:noreply, %{state | running_jobs: running_jobs}}
|
|
||||||
|
|
||||||
{:drop, message} ->
|
|
||||||
Logger.debug(message)
|
|
||||||
{:noreply, %{state | dropped: drop_count + 1}}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:kickoff_timer, state) do
|
|
||||||
retry_interval = get_retry_timer_interval()
|
|
||||||
Process.send_after(__MODULE__, :retry_timer_run, retry_interval)
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:inc_delivered, %{delivered: delivery_count} = state) do
|
|
||||||
{:noreply, %{state | delivered: delivery_count + 1}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(:inc_dropped, %{dropped: drop_count} = state) do
|
|
||||||
{:noreply, %{state | dropped: drop_count + 1}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info({:send, data, transport, retries}, %{delivered: delivery_count} = state) do
|
|
||||||
case transport.publish_one(data) do
|
|
||||||
{:ok, _} ->
|
|
||||||
{:noreply, %{state | delivered: delivery_count + 1}}
|
|
||||||
|
|
||||||
{:error, _reason} ->
|
|
||||||
enqueue(data, transport, retries)
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info(
|
|
||||||
:retry_timer_run,
|
|
||||||
%{queue_table: queue_table, running_jobs: running_jobs} = state
|
|
||||||
) do
|
|
||||||
maybe_kickoff_timer()
|
|
||||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
|
||||||
{:noreply, %{state | running_jobs: running_jobs}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
|
|
||||||
%{running_jobs: running_jobs, queue_table: queue_table} = state
|
|
||||||
running_jobs = :sets.del_element(ref, running_jobs)
|
|
||||||
running_jobs = maybe_start_job(running_jobs, queue_table)
|
|
||||||
{:noreply, %{state | running_jobs: running_jobs}}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info(unknown, state) do
|
|
||||||
Logger.debug("RetryQueue: don't know what to do with #{inspect(unknown)}, ignoring")
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
if Pleroma.Config.get(:env) == :test do
|
|
||||||
defp growth_function(_retries) do
|
|
||||||
_shutit = Pleroma.Config.get([__MODULE__, :initial_timeout])
|
|
||||||
DateTime.to_unix(DateTime.utc_now()) - 1
|
|
||||||
end
|
|
||||||
else
|
|
||||||
defp growth_function(retries) do
|
|
||||||
round(Pleroma.Config.get([__MODULE__, :initial_timeout]) * :math.pow(retries, 3)) +
|
|
||||||
DateTime.to_unix(DateTime.utc_now())
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp maybe_kickoff_timer do
|
|
||||||
GenServer.cast(__MODULE__, :kickoff_timer)
|
|
||||||
end
|
|
||||||
end
|
|
@ -1,318 +0,0 @@
|
|||||||
# Pleroma: A lightweight social networking server
|
|
||||||
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
|
||||||
# SPDX-License-Identifier: AGPL-3.0-only
|
|
||||||
|
|
||||||
defmodule Pleroma.Web.Streamer do
|
|
||||||
use GenServer
|
|
||||||
require Logger
|
|
||||||
alias Pleroma.Activity
|
|
||||||
alias Pleroma.Config
|
|
||||||
alias Pleroma.Conversation.Participation
|
|
||||||
alias Pleroma.Notification
|
|
||||||
alias Pleroma.Object
|
|
||||||
alias Pleroma.User
|
|
||||||
alias Pleroma.Web.ActivityPub.ActivityPub
|
|
||||||
alias Pleroma.Web.ActivityPub.Visibility
|
|
||||||
alias Pleroma.Web.CommonAPI
|
|
||||||
alias Pleroma.Web.MastodonAPI.NotificationView
|
|
||||||
|
|
||||||
@keepalive_interval :timer.seconds(30)
|
|
||||||
|
|
||||||
def start_link(_) do
|
|
||||||
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
|
|
||||||
end
|
|
||||||
|
|
||||||
def add_socket(topic, socket) do
|
|
||||||
GenServer.cast(__MODULE__, %{action: :add, socket: socket, topic: topic})
|
|
||||||
end
|
|
||||||
|
|
||||||
def remove_socket(topic, socket) do
|
|
||||||
GenServer.cast(__MODULE__, %{action: :remove, socket: socket, topic: topic})
|
|
||||||
end
|
|
||||||
|
|
||||||
def stream(topic, item) do
|
|
||||||
GenServer.cast(__MODULE__, %{action: :stream, topic: topic, item: item})
|
|
||||||
end
|
|
||||||
|
|
||||||
def init(args) do
|
|
||||||
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
|
|
||||||
|
|
||||||
{:ok, args}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_info(%{action: :ping}, topics) do
|
|
||||||
topics
|
|
||||||
|> Map.values()
|
|
||||||
|> List.flatten()
|
|
||||||
|> Enum.each(fn socket ->
|
|
||||||
Logger.debug("Sending keepalive ping")
|
|
||||||
send(socket.transport_pid, {:text, ""})
|
|
||||||
end)
|
|
||||||
|
|
||||||
Process.send_after(self(), %{action: :ping}, @keepalive_interval)
|
|
||||||
|
|
||||||
{:noreply, topics}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(%{action: :stream, topic: "direct", item: item}, topics) do
|
|
||||||
recipient_topics =
|
|
||||||
User.get_recipients_from_activity(item)
|
|
||||||
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
|
|
||||||
|
|
||||||
Enum.each(recipient_topics || [], fn user_topic ->
|
|
||||||
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
|
|
||||||
push_to_socket(topics, user_topic, item)
|
|
||||||
end)
|
|
||||||
|
|
||||||
{:noreply, topics}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(%{action: :stream, topic: "participation", item: participation}, topics) do
|
|
||||||
user_topic = "direct:#{participation.user_id}"
|
|
||||||
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
|
|
||||||
|
|
||||||
push_to_socket(topics, user_topic, participation)
|
|
||||||
|
|
||||||
{:noreply, topics}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(%{action: :stream, topic: "list", item: item}, topics) do
|
|
||||||
# filter the recipient list if the activity is not public, see #270.
|
|
||||||
recipient_lists =
|
|
||||||
case Visibility.is_public?(item) do
|
|
||||||
true ->
|
|
||||||
Pleroma.List.get_lists_from_activity(item)
|
|
||||||
|
|
||||||
_ ->
|
|
||||||
Pleroma.List.get_lists_from_activity(item)
|
|
||||||
|> Enum.filter(fn list ->
|
|
||||||
owner = User.get_cached_by_id(list.user_id)
|
|
||||||
|
|
||||||
Visibility.visible_for_user?(item, owner)
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
recipient_topics =
|
|
||||||
recipient_lists
|
|
||||||
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
|
|
||||||
|
|
||||||
Enum.each(recipient_topics || [], fn list_topic ->
|
|
||||||
Logger.debug("Trying to push message to #{list_topic}\n\n")
|
|
||||||
push_to_socket(topics, list_topic, item)
|
|
||||||
end)
|
|
||||||
|
|
||||||
{:noreply, topics}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(
|
|
||||||
%{action: :stream, topic: topic, item: %Notification{} = item},
|
|
||||||
topics
|
|
||||||
)
|
|
||||||
when topic in ["user", "user:notification"] do
|
|
||||||
topics
|
|
||||||
|> Map.get("#{topic}:#{item.user_id}", [])
|
|
||||||
|> Enum.each(fn socket ->
|
|
||||||
with %User{} = user <- User.get_cached_by_ap_id(socket.assigns[:user].ap_id),
|
|
||||||
true <- should_send?(user, item) do
|
|
||||||
send(
|
|
||||||
socket.transport_pid,
|
|
||||||
{:text, represent_notification(socket.assigns[:user], item)}
|
|
||||||
)
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
|
|
||||||
{:noreply, topics}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(%{action: :stream, topic: "user", item: item}, topics) do
|
|
||||||
Logger.debug("Trying to push to users")
|
|
||||||
|
|
||||||
recipient_topics =
|
|
||||||
User.get_recipients_from_activity(item)
|
|
||||||
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
|
|
||||||
|
|
||||||
Enum.each(recipient_topics, fn topic ->
|
|
||||||
push_to_socket(topics, topic, item)
|
|
||||||
end)
|
|
||||||
|
|
||||||
{:noreply, topics}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(%{action: :stream, topic: topic, item: item}, topics) do
|
|
||||||
Logger.debug("Trying to push to #{topic}")
|
|
||||||
Logger.debug("Pushing item to #{topic}")
|
|
||||||
push_to_socket(topics, topic, item)
|
|
||||||
{:noreply, topics}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(%{action: :add, topic: topic, socket: socket}, sockets) do
|
|
||||||
topic = internal_topic(topic, socket)
|
|
||||||
sockets_for_topic = sockets[topic] || []
|
|
||||||
sockets_for_topic = Enum.uniq([socket | sockets_for_topic])
|
|
||||||
sockets = Map.put(sockets, topic, sockets_for_topic)
|
|
||||||
Logger.debug("Got new conn for #{topic}")
|
|
||||||
{:noreply, sockets}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(%{action: :remove, topic: topic, socket: socket}, sockets) do
|
|
||||||
topic = internal_topic(topic, socket)
|
|
||||||
sockets_for_topic = sockets[topic] || []
|
|
||||||
sockets_for_topic = List.delete(sockets_for_topic, socket)
|
|
||||||
sockets = Map.put(sockets, topic, sockets_for_topic)
|
|
||||||
Logger.debug("Removed conn for #{topic}")
|
|
||||||
{:noreply, sockets}
|
|
||||||
end
|
|
||||||
|
|
||||||
def handle_cast(m, state) do
|
|
||||||
Logger.info("Unknown: #{inspect(m)}, #{inspect(state)}")
|
|
||||||
{:noreply, state}
|
|
||||||
end
|
|
||||||
|
|
||||||
defp represent_update(%Activity{} = activity, %User{} = user) do
|
|
||||||
%{
|
|
||||||
event: "update",
|
|
||||||
payload:
|
|
||||||
Pleroma.Web.MastodonAPI.StatusView.render(
|
|
||||||
"status.json",
|
|
||||||
activity: activity,
|
|
||||||
for: user
|
|
||||||
)
|
|
||||||
|> Jason.encode!()
|
|
||||||
}
|
|
||||||
|> Jason.encode!()
|
|
||||||
end
|
|
||||||
|
|
||||||
defp represent_update(%Activity{} = activity) do
|
|
||||||
%{
|
|
||||||
event: "update",
|
|
||||||
payload:
|
|
||||||
Pleroma.Web.MastodonAPI.StatusView.render(
|
|
||||||
"status.json",
|
|
||||||
activity: activity
|
|
||||||
)
|
|
||||||
|> Jason.encode!()
|
|
||||||
}
|
|
||||||
|> Jason.encode!()
|
|
||||||
end
|
|
||||||
|
|
||||||
def represent_conversation(%Participation{} = participation) do
|
|
||||||
%{
|
|
||||||
event: "conversation",
|
|
||||||
payload:
|
|
||||||
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
|
|
||||||
participation: participation,
|
|
||||||
for: participation.user
|
|
||||||
})
|
|
||||||
|> Jason.encode!()
|
|
||||||
}
|
|
||||||
|> Jason.encode!()
|
|
||||||
end
|
|
||||||
|
|
||||||
@spec represent_notification(User.t(), Notification.t()) :: binary()
|
|
||||||
defp represent_notification(%User{} = user, %Notification{} = notify) do
|
|
||||||
%{
|
|
||||||
event: "notification",
|
|
||||||
payload:
|
|
||||||
NotificationView.render(
|
|
||||||
"show.json",
|
|
||||||
%{notification: notify, for: user}
|
|
||||||
)
|
|
||||||
|> Jason.encode!()
|
|
||||||
}
|
|
||||||
|> Jason.encode!()
|
|
||||||
end
|
|
||||||
|
|
||||||
defp should_send?(%User{} = user, %Activity{} = item) do
|
|
||||||
blocks = user.info.blocks || []
|
|
||||||
mutes = user.info.mutes || []
|
|
||||||
reblog_mutes = user.info.muted_reblogs || []
|
|
||||||
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
|
|
||||||
|
|
||||||
with parent when not is_nil(parent) <- Object.normalize(item),
|
|
||||||
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
|
|
||||||
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
|
|
||||||
%{host: item_host} <- URI.parse(item.actor),
|
|
||||||
%{host: parent_host} <- URI.parse(parent.data["actor"]),
|
|
||||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
|
|
||||||
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
|
|
||||||
true <- thread_containment(item, user),
|
|
||||||
false <- CommonAPI.thread_muted?(user, item) do
|
|
||||||
true
|
|
||||||
else
|
|
||||||
_ -> false
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
defp should_send?(%User{} = user, %Notification{activity: activity}) do
|
|
||||||
should_send?(user, activity)
|
|
||||||
end
|
|
||||||
|
|
||||||
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
|
|
||||||
Enum.each(topics[topic] || [], fn socket ->
|
|
||||||
# Get the current user so we have up-to-date blocks etc.
|
|
||||||
if socket.assigns[:user] do
|
|
||||||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
|
|
||||||
|
|
||||||
if should_send?(user, item) do
|
|
||||||
send(socket.transport_pid, {:text, represent_update(item, user)})
|
|
||||||
end
|
|
||||||
else
|
|
||||||
send(socket.transport_pid, {:text, represent_update(item)})
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def push_to_socket(topics, topic, %Participation{} = participation) do
|
|
||||||
Enum.each(topics[topic] || [], fn socket ->
|
|
||||||
send(socket.transport_pid, {:text, represent_conversation(participation)})
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def push_to_socket(topics, topic, %Activity{
|
|
||||||
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
|
|
||||||
}) do
|
|
||||||
Enum.each(topics[topic] || [], fn socket ->
|
|
||||||
send(
|
|
||||||
socket.transport_pid,
|
|
||||||
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
|
|
||||||
)
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
|
|
||||||
|
|
||||||
def push_to_socket(topics, topic, item) do
|
|
||||||
Enum.each(topics[topic] || [], fn socket ->
|
|
||||||
# Get the current user so we have up-to-date blocks etc.
|
|
||||||
if socket.assigns[:user] do
|
|
||||||
user = User.get_cached_by_ap_id(socket.assigns[:user].ap_id)
|
|
||||||
blocks = user.info.blocks || []
|
|
||||||
mutes = user.info.mutes || []
|
|
||||||
|
|
||||||
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
|
|
||||||
true <- thread_containment(item, user) do
|
|
||||||
send(socket.transport_pid, {:text, represent_update(item, user)})
|
|
||||||
end
|
|
||||||
else
|
|
||||||
send(socket.transport_pid, {:text, represent_update(item)})
|
|
||||||
end
|
|
||||||
end)
|
|
||||||
end
|
|
||||||
|
|
||||||
defp internal_topic(topic, socket) when topic in ~w[user user:notification direct] do
|
|
||||||
"#{topic}:#{socket.assigns[:user].id}"
|
|
||||||
end
|
|
||||||
|
|
||||||
defp internal_topic(topic, _), do: topic
|
|
||||||
|
|
||||||
@spec thread_containment(Activity.t(), User.t()) :: boolean()
|
|
||||||
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
|
|
||||||
|
|
||||||
defp thread_containment(activity, user) do
|
|
||||||
if Config.get([:instance, :skip_thread_containment]) do
|
|
||||||
true
|
|
||||||
else
|
|
||||||
ActivityPub.contain_activity(activity, user)
|
|
||||||
end
|
|
||||||
end
|
|
||||||
end
|
|
@ -0,0 +1,33 @@
|
|||||||
|
defmodule Pleroma.Web.Streamer.Ping do
|
||||||
|
use GenServer
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Pleroma.Web.Streamer.State
|
||||||
|
alias Pleroma.Web.Streamer.StreamerSocket
|
||||||
|
|
||||||
|
@keepalive_interval :timer.seconds(30)
|
||||||
|
|
||||||
|
def start_link(opts) do
|
||||||
|
ping_interval = Keyword.get(opts, :ping_interval, @keepalive_interval)
|
||||||
|
GenServer.start_link(__MODULE__, %{ping_interval: ping_interval}, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
|
def init(%{ping_interval: ping_interval} = args) do
|
||||||
|
Process.send_after(self(), :ping, ping_interval)
|
||||||
|
{:ok, args}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_info(:ping, %{ping_interval: ping_interval} = state) do
|
||||||
|
State.get_sockets()
|
||||||
|
|> Map.values()
|
||||||
|
|> List.flatten()
|
||||||
|
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||||
|
Logger.debug("Sending keepalive ping")
|
||||||
|
send(transport_pid, {:text, ""})
|
||||||
|
end)
|
||||||
|
|
||||||
|
Process.send_after(self(), :ping, ping_interval)
|
||||||
|
|
||||||
|
{:noreply, state}
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,78 @@
|
|||||||
|
defmodule Pleroma.Web.Streamer.State do
|
||||||
|
use GenServer
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Pleroma.Web.Streamer.StreamerSocket
|
||||||
|
|
||||||
|
@env Mix.env()
|
||||||
|
|
||||||
|
def start_link(_) do
|
||||||
|
GenServer.start_link(__MODULE__, %{sockets: %{}}, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
|
def add_socket(topic, socket) do
|
||||||
|
GenServer.call(__MODULE__, {:add, topic, socket})
|
||||||
|
end
|
||||||
|
|
||||||
|
def remove_socket(topic, socket) do
|
||||||
|
do_remove_socket(@env, topic, socket)
|
||||||
|
end
|
||||||
|
|
||||||
|
def get_sockets do
|
||||||
|
%{sockets: stream_sockets} = GenServer.call(__MODULE__, :get_state)
|
||||||
|
stream_sockets
|
||||||
|
end
|
||||||
|
|
||||||
|
def init(init_arg) do
|
||||||
|
{:ok, init_arg}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_call(:get_state, _from, state) do
|
||||||
|
{:reply, state, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_call({:add, topic, socket}, _from, %{sockets: sockets} = state) do
|
||||||
|
internal_topic = internal_topic(topic, socket)
|
||||||
|
stream_socket = StreamerSocket.from_socket(socket)
|
||||||
|
|
||||||
|
sockets_for_topic =
|
||||||
|
sockets
|
||||||
|
|> Map.get(internal_topic, [])
|
||||||
|
|> List.insert_at(0, stream_socket)
|
||||||
|
|> Enum.uniq()
|
||||||
|
|
||||||
|
state = put_in(state, [:sockets, internal_topic], sockets_for_topic)
|
||||||
|
Logger.debug("Got new conn for #{topic}")
|
||||||
|
{:reply, state, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_call({:remove, topic, socket}, _from, %{sockets: sockets} = state) do
|
||||||
|
internal_topic = internal_topic(topic, socket)
|
||||||
|
stream_socket = StreamerSocket.from_socket(socket)
|
||||||
|
|
||||||
|
sockets_for_topic =
|
||||||
|
sockets
|
||||||
|
|> Map.get(internal_topic, [])
|
||||||
|
|> List.delete(stream_socket)
|
||||||
|
|
||||||
|
state = Kernel.put_in(state, [:sockets, internal_topic], sockets_for_topic)
|
||||||
|
{:reply, state, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_remove_socket(:test, _, _) do
|
||||||
|
:ok
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_remove_socket(_env, topic, socket) do
|
||||||
|
GenServer.call(__MODULE__, {:remove, topic, socket})
|
||||||
|
end
|
||||||
|
|
||||||
|
defp internal_topic(topic, socket)
|
||||||
|
when topic in ~w[user user:notification direct] do
|
||||||
|
"#{topic}:#{socket.assigns[:user].id}"
|
||||||
|
end
|
||||||
|
|
||||||
|
defp internal_topic(topic, _) do
|
||||||
|
topic
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,55 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.Streamer do
|
||||||
|
alias Pleroma.Web.Streamer.State
|
||||||
|
alias Pleroma.Web.Streamer.Worker
|
||||||
|
|
||||||
|
@timeout 60_000
|
||||||
|
@mix_env Mix.env()
|
||||||
|
|
||||||
|
def add_socket(topic, socket) do
|
||||||
|
State.add_socket(topic, socket)
|
||||||
|
end
|
||||||
|
|
||||||
|
def remove_socket(topic, socket) do
|
||||||
|
State.remove_socket(topic, socket)
|
||||||
|
end
|
||||||
|
|
||||||
|
def get_sockets do
|
||||||
|
State.get_sockets()
|
||||||
|
end
|
||||||
|
|
||||||
|
def stream(topics, items) do
|
||||||
|
if should_send?() do
|
||||||
|
Task.async(fn ->
|
||||||
|
:poolboy.transaction(
|
||||||
|
:streamer_worker,
|
||||||
|
&Worker.stream(&1, topics, items),
|
||||||
|
@timeout
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def supervisor, do: Pleroma.Web.Streamer.Supervisor
|
||||||
|
|
||||||
|
defp should_send? do
|
||||||
|
handle_should_send(@mix_env)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_should_send(:test) do
|
||||||
|
case Process.whereis(:streamer_worker) do
|
||||||
|
nil ->
|
||||||
|
false
|
||||||
|
|
||||||
|
pid ->
|
||||||
|
Process.alive?(pid)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_should_send(_) do
|
||||||
|
true
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,31 @@
|
|||||||
|
defmodule Pleroma.Web.Streamer.StreamerSocket do
|
||||||
|
defstruct transport_pid: nil, user: nil
|
||||||
|
|
||||||
|
alias Pleroma.User
|
||||||
|
alias Pleroma.Web.Streamer.StreamerSocket
|
||||||
|
|
||||||
|
def from_socket(%{
|
||||||
|
transport_pid: transport_pid,
|
||||||
|
assigns: %{user: nil}
|
||||||
|
}) do
|
||||||
|
%StreamerSocket{
|
||||||
|
transport_pid: transport_pid
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
def from_socket(%{
|
||||||
|
transport_pid: transport_pid,
|
||||||
|
assigns: %{user: %User{} = user}
|
||||||
|
}) do
|
||||||
|
%StreamerSocket{
|
||||||
|
transport_pid: transport_pid,
|
||||||
|
user: user
|
||||||
|
}
|
||||||
|
end
|
||||||
|
|
||||||
|
def from_socket(%{transport_pid: transport_pid}) do
|
||||||
|
%StreamerSocket{
|
||||||
|
transport_pid: transport_pid
|
||||||
|
}
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,33 @@
|
|||||||
|
defmodule Pleroma.Web.Streamer.Supervisor do
|
||||||
|
use Supervisor
|
||||||
|
|
||||||
|
def start_link(opts) do
|
||||||
|
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
|
||||||
|
end
|
||||||
|
|
||||||
|
def init(args) do
|
||||||
|
children = [
|
||||||
|
{Pleroma.Web.Streamer.State, args},
|
||||||
|
{Pleroma.Web.Streamer.Ping, args},
|
||||||
|
:poolboy.child_spec(:streamer_worker, poolboy_config())
|
||||||
|
]
|
||||||
|
|
||||||
|
opts = [strategy: :one_for_one, name: Pleroma.Web.Streamer.Supervisor]
|
||||||
|
Supervisor.init(children, opts)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp poolboy_config do
|
||||||
|
opts =
|
||||||
|
Pleroma.Config.get(:streamer,
|
||||||
|
workers: 3,
|
||||||
|
overflow_workers: 2
|
||||||
|
)
|
||||||
|
|
||||||
|
[
|
||||||
|
{:name, {:local, :streamer_worker}},
|
||||||
|
{:worker_module, Pleroma.Web.Streamer.Worker},
|
||||||
|
{:size, opts[:workers]},
|
||||||
|
{:max_overflow, opts[:overflow_workers]}
|
||||||
|
]
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,220 @@
|
|||||||
|
defmodule Pleroma.Web.Streamer.Worker do
|
||||||
|
use GenServer
|
||||||
|
|
||||||
|
require Logger
|
||||||
|
|
||||||
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.Config
|
||||||
|
alias Pleroma.Conversation.Participation
|
||||||
|
alias Pleroma.Notification
|
||||||
|
alias Pleroma.Object
|
||||||
|
alias Pleroma.User
|
||||||
|
alias Pleroma.Web.ActivityPub.ActivityPub
|
||||||
|
alias Pleroma.Web.ActivityPub.Visibility
|
||||||
|
alias Pleroma.Web.CommonAPI
|
||||||
|
alias Pleroma.Web.Streamer.State
|
||||||
|
alias Pleroma.Web.Streamer.StreamerSocket
|
||||||
|
alias Pleroma.Web.StreamerView
|
||||||
|
|
||||||
|
def start_link(_) do
|
||||||
|
GenServer.start_link(__MODULE__, %{}, [])
|
||||||
|
end
|
||||||
|
|
||||||
|
def init(init_arg) do
|
||||||
|
{:ok, init_arg}
|
||||||
|
end
|
||||||
|
|
||||||
|
def stream(pid, topics, items) do
|
||||||
|
GenServer.call(pid, {:stream, topics, items})
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_call({:stream, topics, item}, _from, state) when is_list(topics) do
|
||||||
|
Enum.each(topics, fn t ->
|
||||||
|
do_stream(%{topic: t, item: item})
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:reply, state, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_call({:stream, topic, items}, _from, state) when is_list(items) do
|
||||||
|
Enum.each(items, fn i ->
|
||||||
|
do_stream(%{topic: topic, item: i})
|
||||||
|
end)
|
||||||
|
|
||||||
|
{:reply, state, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
def handle_call({:stream, topic, item}, _from, state) do
|
||||||
|
do_stream(%{topic: topic, item: item})
|
||||||
|
|
||||||
|
{:reply, state, state}
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_stream(%{topic: "direct", item: item}) do
|
||||||
|
recipient_topics =
|
||||||
|
User.get_recipients_from_activity(item)
|
||||||
|
|> Enum.map(fn %{id: id} -> "direct:#{id}" end)
|
||||||
|
|
||||||
|
Enum.each(recipient_topics, fn user_topic ->
|
||||||
|
Logger.debug("Trying to push direct message to #{user_topic}\n\n")
|
||||||
|
push_to_socket(State.get_sockets(), user_topic, item)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_stream(%{topic: "participation", item: participation}) do
|
||||||
|
user_topic = "direct:#{participation.user_id}"
|
||||||
|
Logger.debug("Trying to push a conversation participation to #{user_topic}\n\n")
|
||||||
|
|
||||||
|
push_to_socket(State.get_sockets(), user_topic, participation)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_stream(%{topic: "list", item: item}) do
|
||||||
|
# filter the recipient list if the activity is not public, see #270.
|
||||||
|
recipient_lists =
|
||||||
|
case Visibility.is_public?(item) do
|
||||||
|
true ->
|
||||||
|
Pleroma.List.get_lists_from_activity(item)
|
||||||
|
|
||||||
|
_ ->
|
||||||
|
Pleroma.List.get_lists_from_activity(item)
|
||||||
|
|> Enum.filter(fn list ->
|
||||||
|
owner = User.get_cached_by_id(list.user_id)
|
||||||
|
|
||||||
|
Visibility.visible_for_user?(item, owner)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
recipient_topics =
|
||||||
|
recipient_lists
|
||||||
|
|> Enum.map(fn %{id: id} -> "list:#{id}" end)
|
||||||
|
|
||||||
|
Enum.each(recipient_topics, fn list_topic ->
|
||||||
|
Logger.debug("Trying to push message to #{list_topic}\n\n")
|
||||||
|
push_to_socket(State.get_sockets(), list_topic, item)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_stream(%{topic: topic, item: %Notification{} = item})
|
||||||
|
when topic in ["user", "user:notification"] do
|
||||||
|
State.get_sockets()
|
||||||
|
|> Map.get("#{topic}:#{item.user_id}", [])
|
||||||
|
|> Enum.each(fn %StreamerSocket{transport_pid: transport_pid, user: socket_user} ->
|
||||||
|
with %User{} = user <- User.get_cached_by_ap_id(socket_user.ap_id),
|
||||||
|
true <- should_send?(user, item) do
|
||||||
|
send(transport_pid, {:text, StreamerView.render("notification.json", socket_user, item)})
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_stream(%{topic: "user", item: item}) do
|
||||||
|
Logger.debug("Trying to push to users")
|
||||||
|
|
||||||
|
recipient_topics =
|
||||||
|
User.get_recipients_from_activity(item)
|
||||||
|
|> Enum.map(fn %{id: id} -> "user:#{id}" end)
|
||||||
|
|
||||||
|
Enum.each(recipient_topics, fn topic ->
|
||||||
|
push_to_socket(State.get_sockets(), topic, item)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp do_stream(%{topic: topic, item: item}) do
|
||||||
|
Logger.debug("Trying to push to #{topic}")
|
||||||
|
Logger.debug("Pushing item to #{topic}")
|
||||||
|
push_to_socket(State.get_sockets(), topic, item)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp should_send?(%User{} = user, %Activity{} = item) do
|
||||||
|
blocks = user.info.blocks || []
|
||||||
|
mutes = user.info.mutes || []
|
||||||
|
reblog_mutes = user.info.muted_reblogs || []
|
||||||
|
domain_blocks = Pleroma.Web.ActivityPub.MRF.subdomains_regex(user.info.domain_blocks)
|
||||||
|
|
||||||
|
with parent when not is_nil(parent) <- Object.normalize(item),
|
||||||
|
true <- Enum.all?([blocks, mutes, reblog_mutes], &(item.actor not in &1)),
|
||||||
|
true <- Enum.all?([blocks, mutes], &(parent.data["actor"] not in &1)),
|
||||||
|
%{host: item_host} <- URI.parse(item.actor),
|
||||||
|
%{host: parent_host} <- URI.parse(parent.data["actor"]),
|
||||||
|
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, item_host),
|
||||||
|
false <- Pleroma.Web.ActivityPub.MRF.subdomain_match?(domain_blocks, parent_host),
|
||||||
|
true <- thread_containment(item, user),
|
||||||
|
false <- CommonAPI.thread_muted?(user, item) do
|
||||||
|
true
|
||||||
|
else
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp should_send?(%User{} = user, %Notification{activity: activity}) do
|
||||||
|
should_send?(user, activity)
|
||||||
|
end
|
||||||
|
|
||||||
|
def push_to_socket(topics, topic, %Activity{data: %{"type" => "Announce"}} = item) do
|
||||||
|
Enum.each(topics[topic] || [], fn %StreamerSocket{
|
||||||
|
transport_pid: transport_pid,
|
||||||
|
user: socket_user
|
||||||
|
} ->
|
||||||
|
# Get the current user so we have up-to-date blocks etc.
|
||||||
|
if socket_user do
|
||||||
|
user = User.get_cached_by_ap_id(socket_user.ap_id)
|
||||||
|
|
||||||
|
if should_send?(user, item) do
|
||||||
|
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
|
||||||
|
end
|
||||||
|
else
|
||||||
|
send(transport_pid, {:text, StreamerView.render("update.json", item)})
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
def push_to_socket(topics, topic, %Participation{} = participation) do
|
||||||
|
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||||
|
send(transport_pid, {:text, StreamerView.render("conversation.json", participation)})
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
def push_to_socket(topics, topic, %Activity{
|
||||||
|
data: %{"type" => "Delete", "deleted_activity_id" => deleted_activity_id}
|
||||||
|
}) do
|
||||||
|
Enum.each(topics[topic] || [], fn %StreamerSocket{transport_pid: transport_pid} ->
|
||||||
|
send(
|
||||||
|
transport_pid,
|
||||||
|
{:text, %{event: "delete", payload: to_string(deleted_activity_id)} |> Jason.encode!()}
|
||||||
|
)
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
def push_to_socket(_topics, _topic, %Activity{data: %{"type" => "Delete"}}), do: :noop
|
||||||
|
|
||||||
|
def push_to_socket(topics, topic, item) do
|
||||||
|
Enum.each(topics[topic] || [], fn %StreamerSocket{
|
||||||
|
transport_pid: transport_pid,
|
||||||
|
user: socket_user
|
||||||
|
} ->
|
||||||
|
# Get the current user so we have up-to-date blocks etc.
|
||||||
|
if socket_user do
|
||||||
|
user = User.get_cached_by_ap_id(socket_user.ap_id)
|
||||||
|
blocks = user.info.blocks || []
|
||||||
|
mutes = user.info.mutes || []
|
||||||
|
|
||||||
|
with true <- Enum.all?([blocks, mutes], &(item.actor not in &1)),
|
||||||
|
true <- thread_containment(item, user) do
|
||||||
|
send(transport_pid, {:text, StreamerView.render("update.json", item, user)})
|
||||||
|
end
|
||||||
|
else
|
||||||
|
send(transport_pid, {:text, StreamerView.render("update.json", item)})
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
end
|
||||||
|
|
||||||
|
@spec thread_containment(Activity.t(), User.t()) :: boolean()
|
||||||
|
defp thread_containment(_activity, %User{info: %{skip_thread_containment: true}}), do: true
|
||||||
|
|
||||||
|
defp thread_containment(activity, user) do
|
||||||
|
if Config.get([:instance, :skip_thread_containment]) do
|
||||||
|
true
|
||||||
|
else
|
||||||
|
ActivityPub.contain_activity(activity, user)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,66 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Web.StreamerView do
|
||||||
|
use Pleroma.Web, :view
|
||||||
|
|
||||||
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.Conversation.Participation
|
||||||
|
alias Pleroma.Notification
|
||||||
|
alias Pleroma.User
|
||||||
|
alias Pleroma.Web.MastodonAPI.NotificationView
|
||||||
|
|
||||||
|
def render("update.json", %Activity{} = activity, %User{} = user) do
|
||||||
|
%{
|
||||||
|
event: "update",
|
||||||
|
payload:
|
||||||
|
Pleroma.Web.MastodonAPI.StatusView.render(
|
||||||
|
"status.json",
|
||||||
|
activity: activity,
|
||||||
|
for: user
|
||||||
|
)
|
||||||
|
|> Jason.encode!()
|
||||||
|
}
|
||||||
|
|> Jason.encode!()
|
||||||
|
end
|
||||||
|
|
||||||
|
def render("notification.json", %User{} = user, %Notification{} = notify) do
|
||||||
|
%{
|
||||||
|
event: "notification",
|
||||||
|
payload:
|
||||||
|
NotificationView.render(
|
||||||
|
"show.json",
|
||||||
|
%{notification: notify, for: user}
|
||||||
|
)
|
||||||
|
|> Jason.encode!()
|
||||||
|
}
|
||||||
|
|> Jason.encode!()
|
||||||
|
end
|
||||||
|
|
||||||
|
def render("update.json", %Activity{} = activity) do
|
||||||
|
%{
|
||||||
|
event: "update",
|
||||||
|
payload:
|
||||||
|
Pleroma.Web.MastodonAPI.StatusView.render(
|
||||||
|
"status.json",
|
||||||
|
activity: activity
|
||||||
|
)
|
||||||
|
|> Jason.encode!()
|
||||||
|
}
|
||||||
|
|> Jason.encode!()
|
||||||
|
end
|
||||||
|
|
||||||
|
def render("conversation.json", %Participation{} = participation) do
|
||||||
|
%{
|
||||||
|
event: "conversation",
|
||||||
|
payload:
|
||||||
|
Pleroma.Web.MastodonAPI.ConversationView.render("participation.json", %{
|
||||||
|
participation: participation,
|
||||||
|
for: participation.user
|
||||||
|
})
|
||||||
|
|> Jason.encode!()
|
||||||
|
}
|
||||||
|
|> Jason.encode!()
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,18 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.ActivityExpirationWorker do
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "activity_expiration"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(
|
||||||
|
%{
|
||||||
|
"op" => "activity_expiration",
|
||||||
|
"activity_expiration_id" => activity_expiration_id
|
||||||
|
},
|
||||||
|
_job
|
||||||
|
) do
|
||||||
|
Pleroma.Daemons.ActivityExpirationDaemon.perform(:execute, activity_expiration_id)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,69 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.BackgroundWorker do
|
||||||
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.User
|
||||||
|
alias Pleroma.Web.ActivityPub.MRF.MediaProxyWarmingPolicy
|
||||||
|
alias Pleroma.Web.OAuth.Token.CleanWorker
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "background"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "fetch_initial_posts", "user_id" => user_id}, _job) do
|
||||||
|
user = User.get_cached_by_id(user_id)
|
||||||
|
User.perform(:fetch_initial_posts, user)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "deactivate_user", "user_id" => user_id, "status" => status}, _job) do
|
||||||
|
user = User.get_cached_by_id(user_id)
|
||||||
|
User.perform(:deactivate_async, user, status)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "delete_user", "user_id" => user_id}, _job) do
|
||||||
|
user = User.get_cached_by_id(user_id)
|
||||||
|
User.perform(:delete, user)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(
|
||||||
|
%{
|
||||||
|
"op" => "blocks_import",
|
||||||
|
"blocker_id" => blocker_id,
|
||||||
|
"blocked_identifiers" => blocked_identifiers
|
||||||
|
},
|
||||||
|
_job
|
||||||
|
) do
|
||||||
|
blocker = User.get_cached_by_id(blocker_id)
|
||||||
|
User.perform(:blocks_import, blocker, blocked_identifiers)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(
|
||||||
|
%{
|
||||||
|
"op" => "follow_import",
|
||||||
|
"follower_id" => follower_id,
|
||||||
|
"followed_identifiers" => followed_identifiers
|
||||||
|
},
|
||||||
|
_job
|
||||||
|
) do
|
||||||
|
follower = User.get_cached_by_id(follower_id)
|
||||||
|
User.perform(:follow_import, follower, followed_identifiers)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "clean_expired_tokens"}, _job) do
|
||||||
|
CleanWorker.perform(:clean)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "media_proxy_preload", "message" => message}, _job) do
|
||||||
|
MediaProxyWarmingPolicy.perform(:preload, message)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "media_proxy_prefetch", "url" => url}, _job) do
|
||||||
|
MediaProxyWarmingPolicy.perform(:prefetch, url)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "fetch_data_for_activity", "activity_id" => activity_id}, _job) do
|
||||||
|
activity = Activity.get_by_id(activity_id)
|
||||||
|
Pleroma.Web.RichMedia.Helpers.perform(:fetch, activity)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,16 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.DigestEmailsWorker do
|
||||||
|
alias Pleroma.User
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "digest_emails"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "digest_email", "user_id" => user_id}, _job) do
|
||||||
|
user_id
|
||||||
|
|> User.get_cached_by_id()
|
||||||
|
|> Pleroma.Daemons.DigestEmailDaemon.perform()
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,15 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.MailerWorker do
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "mailer"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "email", "encoded_email" => encoded_email, "config" => config}, _job) do
|
||||||
|
encoded_email
|
||||||
|
|> Base.decode64!()
|
||||||
|
|> :erlang.binary_to_term()
|
||||||
|
|> Pleroma.Emails.Mailer.deliver(config)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,25 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.PublisherWorker do
|
||||||
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.Web.Federator
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
|
||||||
|
|
||||||
|
def backoff(attempt) when is_integer(attempt) do
|
||||||
|
Pleroma.Workers.WorkerHelper.sidekiq_backoff(attempt, 5)
|
||||||
|
end
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "publish", "activity_id" => activity_id}, _job) do
|
||||||
|
activity = Activity.get_by_id(activity_id)
|
||||||
|
Federator.perform(:publish, activity)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "publish_one", "module" => module_name, "params" => params}, _job) do
|
||||||
|
params = Map.new(params, fn {k, v} -> {String.to_atom(k), v} end)
|
||||||
|
Federator.perform(:publish_one, String.to_atom(module_name), params)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,18 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.ReceiverWorker do
|
||||||
|
alias Pleroma.Web.Federator
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "federator_incoming"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "incoming_doc", "body" => doc}, _job) do
|
||||||
|
Federator.perform(:incoming_doc, doc)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "incoming_ap_doc", "params" => params}, _job) do
|
||||||
|
Federator.perform(:incoming_ap_doc, params)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,12 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.ScheduledActivityWorker do
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "scheduled_activities"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "execute", "activity_id" => activity_id}, _job) do
|
||||||
|
Pleroma.Daemons.ScheduledActivityDaemon.perform(:execute, activity_id)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,26 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.SubscriberWorker do
|
||||||
|
alias Pleroma.Repo
|
||||||
|
alias Pleroma.Web.Federator
|
||||||
|
alias Pleroma.Web.Websub
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "federator_outgoing"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "refresh_subscriptions"}, _job) do
|
||||||
|
Federator.perform(:refresh_subscriptions)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "request_subscription", "websub_id" => websub_id}, _job) do
|
||||||
|
websub = Repo.get(Websub.WebsubClientSubscription, websub_id)
|
||||||
|
Federator.perform(:request_subscription, websub)
|
||||||
|
end
|
||||||
|
|
||||||
|
def perform(%{"op" => "verify_websub", "websub_id" => websub_id}, _job) do
|
||||||
|
websub = Repo.get(Websub.WebsubServerSubscription, websub_id)
|
||||||
|
Federator.perform(:verify_websub, websub)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,15 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.TransmogrifierWorker do
|
||||||
|
alias Pleroma.User
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "transmogrifier"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "user_upgrade", "user_id" => user_id}, _job) do
|
||||||
|
user = User.get_cached_by_id(user_id)
|
||||||
|
Pleroma.Web.ActivityPub.Transmogrifier.perform(:user_upgrade, user)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,20 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.WebPusherWorker do
|
||||||
|
alias Pleroma.Notification
|
||||||
|
alias Pleroma.Repo
|
||||||
|
|
||||||
|
use Pleroma.Workers.WorkerHelper, queue: "web_push"
|
||||||
|
|
||||||
|
@impl Oban.Worker
|
||||||
|
def perform(%{"op" => "web_push", "notification_id" => notification_id}, _job) do
|
||||||
|
notification =
|
||||||
|
Notification
|
||||||
|
|> Repo.get(notification_id)
|
||||||
|
|> Repo.preload([:activity])
|
||||||
|
|
||||||
|
Pleroma.Web.Push.Impl.perform(notification)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,46 @@
|
|||||||
|
# Pleroma: A lightweight social networking server
|
||||||
|
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social/>
|
||||||
|
# SPDX-License-Identifier: AGPL-3.0-only
|
||||||
|
|
||||||
|
defmodule Pleroma.Workers.WorkerHelper do
|
||||||
|
alias Pleroma.Config
|
||||||
|
alias Pleroma.Workers.WorkerHelper
|
||||||
|
|
||||||
|
def worker_args(queue) do
|
||||||
|
case Config.get([:workers, :retries, queue]) do
|
||||||
|
nil -> []
|
||||||
|
max_attempts -> [max_attempts: max_attempts]
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
|
||||||
|
backoff =
|
||||||
|
:math.pow(attempt, pow) +
|
||||||
|
base_backoff +
|
||||||
|
:rand.uniform(2 * base_backoff) * attempt
|
||||||
|
|
||||||
|
trunc(backoff)
|
||||||
|
end
|
||||||
|
|
||||||
|
defmacro __using__(opts) do
|
||||||
|
caller_module = __CALLER__.module
|
||||||
|
queue = Keyword.fetch!(opts, :queue)
|
||||||
|
|
||||||
|
quote do
|
||||||
|
# Note: `max_attempts` is intended to be overridden in `new/2` call
|
||||||
|
use Oban.Worker,
|
||||||
|
queue: unquote(queue),
|
||||||
|
max_attempts: 1
|
||||||
|
|
||||||
|
def enqueue(op, params, worker_args \\ []) do
|
||||||
|
params = Map.merge(%{"op" => op}, params)
|
||||||
|
queue_atom = String.to_atom(unquote(queue))
|
||||||
|
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
|
||||||
|
|
||||||
|
unquote(caller_module)
|
||||||
|
|> apply(:new, [params, worker_args])
|
||||||
|
|> Pleroma.Repo.insert()
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,6 @@
|
|||||||
|
defmodule Pleroma.Repo.Migrations.AddObanJobsTable do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
defdelegate up, to: Oban.Migrations
|
||||||
|
defdelegate down, to: Oban.Migrations
|
||||||
|
end
|
@ -0,0 +1,12 @@
|
|||||||
|
defmodule Pleroma.Repo.Migrations.CreateDeliveries do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def change do
|
||||||
|
create_if_not_exists table(:deliveries) do
|
||||||
|
add(:object_id, references(:objects, type: :id), null: false)
|
||||||
|
add(:user_id, references(:users, type: :uuid, on_delete: :delete_all), null: false)
|
||||||
|
end
|
||||||
|
create_if_not_exists index(:deliveries, :object_id, name: :deliveries_object_id)
|
||||||
|
create_if_not_exists(unique_index(:deliveries, [:user_id, :object_id]))
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,11 @@
|
|||||||
|
defmodule Pleroma.Repo.Migrations.UpdateOban do
|
||||||
|
use Ecto.Migration
|
||||||
|
|
||||||
|
def up do
|
||||||
|
Oban.Migrations.up(version: 4)
|
||||||
|
end
|
||||||
|
|
||||||
|
def down do
|
||||||
|
Oban.Migrations.down(version: 2)
|
||||||
|
end
|
||||||
|
end
|
@ -0,0 +1,141 @@
|
|||||||
|
defmodule Pleroma.Activity.Ir.TopicsTest do
|
||||||
|
use Pleroma.DataCase
|
||||||
|
|
||||||
|
alias Pleroma.Activity
|
||||||
|
alias Pleroma.Activity.Ir.Topics
|
||||||
|
alias Pleroma.Object
|
||||||
|
|
||||||
|
require Pleroma.Constants
|
||||||
|
|
||||||
|
describe "poll answer" do
|
||||||
|
test "produce no topics" do
|
||||||
|
activity = %Activity{object: %Object{data: %{"type" => "Answer"}}}
|
||||||
|
|
||||||
|
assert [] == Topics.get_activity_topics(activity)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "non poll answer" do
|
||||||
|
test "always add user and list topics" do
|
||||||
|
activity = %Activity{object: %Object{data: %{"type" => "FooBar"}}}
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
assert Enum.member?(topics, "user")
|
||||||
|
assert Enum.member?(topics, "list")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "public visibility" do
|
||||||
|
setup do
|
||||||
|
activity = %Activity{
|
||||||
|
object: %Object{data: %{"type" => "Note"}},
|
||||||
|
data: %{"to" => [Pleroma.Constants.as_public()]}
|
||||||
|
}
|
||||||
|
|
||||||
|
{:ok, activity: activity}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "produces public topic", %{activity: activity} do
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
assert Enum.member?(topics, "public")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "local action produces public:local topic", %{activity: activity} do
|
||||||
|
activity = %{activity | local: true}
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
assert Enum.member?(topics, "public:local")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "non-local action does not produce public:local topic", %{activity: activity} do
|
||||||
|
activity = %{activity | local: false}
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
refute Enum.member?(topics, "public:local")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "public visibility create events" do
|
||||||
|
setup do
|
||||||
|
activity = %Activity{
|
||||||
|
object: %Object{data: %{"type" => "Create", "attachment" => []}},
|
||||||
|
data: %{"to" => [Pleroma.Constants.as_public()]}
|
||||||
|
}
|
||||||
|
|
||||||
|
{:ok, activity: activity}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "with no attachments doesn't produce public:media topics", %{activity: activity} do
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
refute Enum.member?(topics, "public:media")
|
||||||
|
refute Enum.member?(topics, "public:local:media")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "converts tags to hash tags", %{activity: %{object: %{data: data} = object} = activity} do
|
||||||
|
tagged_data = Map.put(data, "tag", ["foo", "bar"])
|
||||||
|
activity = %{activity | object: %{object | data: tagged_data}}
|
||||||
|
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
assert Enum.member?(topics, "hashtag:foo")
|
||||||
|
assert Enum.member?(topics, "hashtag:bar")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "only converts strinngs to hash tags", %{
|
||||||
|
activity: %{object: %{data: data} = object} = activity
|
||||||
|
} do
|
||||||
|
tagged_data = Map.put(data, "tag", [2])
|
||||||
|
activity = %{activity | object: %{object | data: tagged_data}}
|
||||||
|
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
refute Enum.member?(topics, "hashtag:2")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "public visibility create events with attachments" do
|
||||||
|
setup do
|
||||||
|
activity = %Activity{
|
||||||
|
object: %Object{data: %{"type" => "Create", "attachment" => ["foo"]}},
|
||||||
|
data: %{"to" => [Pleroma.Constants.as_public()]}
|
||||||
|
}
|
||||||
|
|
||||||
|
{:ok, activity: activity}
|
||||||
|
end
|
||||||
|
|
||||||
|
test "produce public:media topics", %{activity: activity} do
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
assert Enum.member?(topics, "public:media")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "local produces public:local:media topics", %{activity: activity} do
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
assert Enum.member?(topics, "public:local:media")
|
||||||
|
end
|
||||||
|
|
||||||
|
test "non-local doesn't produce public:local:media topics", %{activity: activity} do
|
||||||
|
activity = %{activity | local: false}
|
||||||
|
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
refute Enum.member?(topics, "public:local:media")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
describe "non-public visibility" do
|
||||||
|
test "produces direct topic" do
|
||||||
|
activity = %Activity{object: %Object{data: %{"type" => "Note"}}, data: %{"to" => []}}
|
||||||
|
topics = Topics.get_activity_topics(activity)
|
||||||
|
|
||||||
|
assert Enum.member?(topics, "direct")
|
||||||
|
refute Enum.member?(topics, "public")
|
||||||
|
refute Enum.member?(topics, "public:local")
|
||||||
|
refute Enum.member?(topics, "public:media")
|
||||||
|
refute Enum.member?(topics, "public:local:media")
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue