Kibou is a federated social networking server.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

activity.rs 7.8KB


  1. use database::models::{InsertActivity, QueryActivity, QueryActivityId};
  2. use database::runtime_escape;
  3. use database::schema::activities;
  4. use database::schema::activities::dsl::*;
  5. use diesel::pg::PgConnection;
  6. use diesel::query_dsl::QueryDsl;
  7. use diesel::query_dsl::RunQueryDsl;
  8. use diesel::sql_query;
  9. use diesel::ExpressionMethods;
  10. use env;
  11. use serde_json;
  12. #[derive(Clone)]
  13. pub struct Activity {
  14. pub id: i64,
  15. pub data: serde_json::Value,
  16. pub actor: String,
  17. }
  18. // Beware: This module depends on a lot of raw queries, which we should deprecate in the future. The
  19. // only reason they're being used is because Diesel.rs does not support JSONB operators that are
  20. // needed:
  21. // {->, ->>, @>, ?}
  22. //
  23. // Related issue: https://git.cybre.club/kibouproject/kibou/issues/32
  24. // Diesel.rs issue: https://github.com/diesel-rs/diesel/issues/44
  25. pub fn count_ap_object_replies_by_id(
  26. db_connection: &PgConnection,
  27. object_id: &str,
  28. ) -> Result<usize, diesel::result::Error> {
  29. match sql_query(format!(
  30. "SELECT id FROM activities WHERE data @> '{{\"object\": {{\"inReplyTo\": \"{}\"}}}}';",
  31. runtime_escape(object_id)
  32. ))
  33. .load::<QueryActivityId>(db_connection)
  34. {
  35. Ok(activity_arr) => Ok(activity_arr.len()),
  36. Err(e) => Err(e),
  37. }
  38. }
  39. pub fn count_ap_object_reactions_by_id(
  40. db_connection: &PgConnection,
  41. object_id: &str,
  42. reaction: &str,
  43. ) -> Result<usize, diesel::result::Error> {
  44. match sql_query(format!(
  45. "SELECT id FROM activities WHERE data @> '{{\"type\": \"{reaction_type}\"}}' \
  46. AND data @> '{{\"object\": \"{id}\"}}';",
  47. reaction_type = runtime_escape(reaction),
  48. id = runtime_escape(object_id)
  49. ))
  50. .load::<QueryActivityId>(db_connection)
  51. {
  52. Ok(activity_arr) => Ok(activity_arr.len()),
  53. Err(e) => Err(e),
  54. }
  55. }
  56. pub fn count_ap_notes_for_actor(
  57. db_connection: &PgConnection,
  58. actor: &str,
  59. ) -> Result<usize, diesel::result::Error> {
  60. match sql_query(format!(
  61. "SELECT id \
  62. FROM activities \
  63. WHERE data @> '{{\"actor\": \"{actor}\", \
  64. \"type\": \"Create\", \
  65. \"object\": {{\"type\": \"Note\"}}}}' \
  66. AND ((data->>'to')::jsonb ? 'https://www.w3.org/ns/activitystreams#Public' \
  67. OR (data->>'cc')::jsonb ? 'https://www.w3.org/ns/activitystreams#Public');",
  68. actor = runtime_escape(actor)
  69. ))
  70. .load::<QueryActivityId>(db_connection)
  71. {
  72. Ok(activity_arr) => Ok(activity_arr.len()),
  73. Err(e) => Err(e),
  74. }
  75. }
  76. pub fn count_local_ap_notes(db_connection: &PgConnection) -> Result<usize, diesel::result::Error> {
  77. match sql_query(format!(
  78. "SELECT id \
  79. FROM activities \
  80. WHERE data->>'type' = 'Create' \
  81. AND data->'object'->>'type' = 'Note' \
  82. AND data->>'actor' LIKE '{base_scheme}://{base_domain}/%' \
  83. AND ((data->>'to')::jsonb ? 'https://www.w3.org/ns/activitystreams#Public' \
  84. OR (data->>'cc')::jsonb ? 'https://www.w3.org/ns/activitystreams#Public')",
  85. base_scheme = env::get_value(String::from("endpoint.base_scheme")),
  86. base_domain = env::get_value(String::from("endpoint.base_domain"))
  87. ))
  88. .clone()
  89. .load::<QueryActivityId>(db_connection)
  90. {
  91. Ok(activity_arr) => Ok(activity_arr.len()),
  92. Err(e) => Err(e),
  93. }
  94. }
  95. pub fn get_activities_by_id(
  96. db_connection: &PgConnection,
  97. ids: Vec<i64>,
  98. ) -> Result<Vec<Activity>, diesel::result::Error> {
  99. let parsed_ids: Vec<String> = ids.iter().map(|num| num.to_string()).collect();
  100. match sql_query(format!(
  101. "SELECT * FROM activities WHERE id = ANY(ARRAY[{}]);",
  102. parsed_ids.join(", ")
  103. ))
  104. .load::<QueryActivity>(db_connection)
  105. {
  106. Ok(activity_arr) => Ok(activity_arr
  107. .iter()
  108. .map(|activity| serialize_activity(activity.clone()))
  109. .collect()),
  110. Err(e) => Err(e),
  111. }
  112. }
  113. pub fn get_activity_by_id(
  114. db_connection: &PgConnection,
  115. activity_id: i64,
  116. ) -> Result<Activity, diesel::result::Error> {
  117. match activities
  118. .filter(id.eq(activity_id))
  119. .limit(1)
  120. .first::<QueryActivity>(db_connection)
  121. {
  122. Ok(activity) => Ok(serialize_activity(activity)),
  123. Err(e) => Err(e),
  124. }
  125. }
  126. pub fn get_ap_activity_by_id(
  127. db_connection: &PgConnection,
  128. activity_id: &str,
  129. ) -> Result<Activity, diesel::result::Error> {
  130. match sql_query(format!(
  131. "SELECT * FROM activities WHERE data @> '{{\"id\": \"{}\"}}' LIMIT 1;",
  132. runtime_escape(activity_id)
  133. ))
  134. .clone()
  135. .load::<QueryActivity>(db_connection)
  136. {
  137. Ok(activity) => {
  138. if !activity.is_empty() {
  139. let new_activity = std::borrow::ToOwned::to_owned(&activity[0]);
  140. Ok(serialize_activity(new_activity))
  141. } else {
  142. Err(diesel::result::Error::NotFound)
  143. }
  144. }
  145. Err(e) => Err(e),
  146. }
  147. }
  148. pub fn get_ap_object_by_id(
  149. db_connection: &PgConnection,
  150. object_id: &str,
  151. ) -> Result<Activity, diesel::result::Error> {
  152. match sql_query(format!(
  153. "SELECT * FROM activities WHERE data @> '{{\"object\": {{\"id\": \"{}\"}}}}' LIMIT 1;",
  154. runtime_escape(object_id)
  155. ))
  156. .load::<QueryActivity>(db_connection)
  157. {
  158. Ok(activity) => {
  159. if !activity.is_empty() {
  160. let new_activity = std::borrow::ToOwned::to_owned(&activity[0]);
  161. Ok(serialize_activity(new_activity))
  162. } else {
  163. Err(diesel::result::Error::NotFound)
  164. }
  165. }
  166. Err(e) => Err(e),
  167. }
  168. }
  169. pub fn get_ap_object_replies_by_id(
  170. db_connection: &PgConnection,
  171. object_id: &str,
  172. ) -> Result<Vec<Activity>, diesel::result::Error> {
  173. match sql_query(format!(
  174. "SELECT * FROM activities WHERE data->'object'->>'inReplyTo' = '{}';",
  175. runtime_escape(object_id)
  176. ))
  177. .load::<QueryActivity>(db_connection)
  178. {
  179. Ok(activity_arr) => Ok(activity_arr
  180. .iter()
  181. .map(|activity| serialize_activity(activity.clone()))
  182. .collect()),
  183. Err(e) => Err(e),
  184. }
  185. }
  186. pub fn type_exists_for_object_id(
  187. db_connection: &PgConnection,
  188. _type: &str,
  189. actor: &str,
  190. object_id: &str,
  191. ) -> Result<bool, diesel::result::Error> {
  192. match sql_query(format!(
  193. "SELECT * FROM activities WHERE data @> '{{\"type\": \"{}\", \"actor\": \"{}\", \"object\": \"{}\"}}' LIMIT 1;",
  194. _type, runtime_escape(actor), runtime_escape(object_id)
  195. ))
  196. .load::<QueryActivity>(db_connection)
  197. {
  198. Ok(activity) => {
  199. if !activity.is_empty() {
  200. Ok(true)
  201. } else {
  202. Ok(false)
  203. }
  204. }
  205. Err(e) => Err(e),
  206. }
  207. }
  208. pub fn serialize_activity(sql_activity: QueryActivity) -> Activity {
  209. Activity {
  210. id: sql_activity.id,
  211. data: sql_activity.data,
  212. actor: sql_activity.actor_uri,
  213. }
  214. }
  215. pub fn deserialize_activity(activity: &Activity) -> InsertActivity {
  216. InsertActivity {
  217. data: &activity.data,
  218. actor_uri: &activity.actor,
  219. }
  220. }
  221. pub fn insert_activity(db_connection: &PgConnection, activity: Activity) -> Activity {
  222. let new_activity = deserialize_activity(&activity);
  223. serialize_activity(
  224. diesel::insert_into(activities::table)
  225. .values(&new_activity)
  226. .get_result(db_connection)
  227. .expect("Error creating activity"),
  228. )
  229. }
  230. pub fn delete_ap_activity_by_id(db_connection: &PgConnection, activity_id: String) {
  231. sql_query(format!(
  232. "DELETE FROM activities WHERE data->>'id' = '{}';",
  233. runtime_escape(&activity_id)
  234. ))
  235. .execute(db_connection);
  236. }
  237. pub fn delete_ap_object_by_id(db_connection: &PgConnection, object_id: String) {
  238. sql_query(format!(
  239. "DELETE FROM activities WHERE data->'object'->>'id' = '{}';",
  240. runtime_escape(&object_id)
  241. ))
  242. .execute(db_connection);
  243. }