1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
|
import argparse
import logging
from pathlib import Path
import traceback
import json
import os
import requests
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('granola_sync.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def load_credentials():
"""
从supabase.json加载Granola凭据
"""
creds_path = Path.home() / "Library/Application Support/Granola/supabase.json"
if not creds_path.exists():
logger.error(f"凭据文件未找到于: {creds_path}")
return None
try:
with open(creds_path, 'r') as f:
data = json.load(f)
# 将cognito_tokens字符串解析为字典
cognito_tokens = json.loads(data['cognito_tokens'])
access_token = cognito_tokens.get('access_token')
if not access_token:
logger.error("在凭据文件中未找到访问令牌")
return None
logger.debug("成功加载凭据")
return access_token
except Exception as e:
logger.error(f"读取凭据文件时出错: {str(e)}")
return None
def fetch_granola_documents(token):
"""
从Granola API获取文档
"""
url = "https://api.granola.ai/v2/get-documents"
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"Accept": "*/*",
"User-Agent": "Granola/5.354.0",
"X-Client-Version": "5.354.0"
}
data = {
"limit": 100,
"offset": 0,
"include_last_viewed_panel": True
}
try:
response = requests.post(url, headers=headers, json=data)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"获取文档时出错: {str(e)}")
return None
def convert_prosemirror_to_markdown(content):
"""
将ProseMirror JSON转换为Markdown
"""
if not content or not isinstance(content, dict) or 'content' not in content:
return ""
markdown = []
def process_node(node):
if not isinstance(node, dict):
return ""
node_type = node.get('type', '')
content = node.get('content', [])
text = node.get('text', '')
if node_type == 'heading':
level = node.get('attrs', {}).get('level', 1)
heading_text = ''.join(process_node(child) for child in content)
return f"{'#' * level} {heading_text}\n\n"
elif node_type == 'paragraph':
para_text = ''.join(process_node(child) for child in content)
return f"{para_text}\n\n"
elif node_type == 'bulletList':
items = []
for item in content:
if item.get('type') == 'listItem':
item_content = ''.join(process_node(child) for child in item.get('content', []))
items.append(f"- {item_content.strip()}")
return '\n'.join(items) + '\n\n'
elif node_type == 'text':
return text
return ''.join(process_node(child) for child in content)
return process_node(content)
def sanitize_filename(title):
"""
将标题转换为有效的文件名
"""
# 移除无效字符
invalid_chars = '<>:"/\\|?*'
filename = ''.join(c for c in title if c not in invalid_chars)
# 将空格替换为下划线
filename = filename.replace(' ', '_')
return filename
def main():
logger.info("开始Granola同步进程")
parser = argparse.ArgumentParser(description="获取Granola笔记并将其保存为Markdown文件到Obsidian文件夹。")
parser.add_argument("output_dir", type=str, help="保存笔记的Obsidian子文件夹的完整路径。")
args = parser.parse_args()
output_path = Path(args.output_dir)
logger.info(f"输出目录设置为: {output_path}")
if not output_path.is_dir():
logger.error(f"输出目录'{output_path}'不存在或不是目录。")
logger.error("请先创建该目录。")
return
logger.info("尝试加载凭据...")
token = load_credentials()
if not token:
logger.error("加载凭据失败。退出。")
return
logger.info("凭据加载成功。从Granola API获取文档...")
api_response = fetch_granola_documents(token)
if not api_response:
logger.error("获取文档失败 - API响应为空")
return
if "docs" not in api_response:
logger.error("API响应格式意外 - 未找到'docs'键")
logger.debug(f"API响应: {api_response}")
return
documents = api_response["docs"]
logger.info(f"成功从Granola获取{len(documents)}个文档")
synced_count = 0
for doc in documents:
title = doc.get("title", "未命名Granola笔记")
doc_id = doc.get("id", "unknown_id")
logger.info(f"处理文档: {title} (ID: {doc_id})")
content_to_parse = None
if doc.get("last_viewed_panel") and \
isinstance(doc["last_viewed_panel"], dict) and \
doc["last_viewed_panel"].get("content") and \
isinstance(doc["last_viewed_panel"]["content"], dict) and \
doc["last_viewed_panel"]["content"].get("type") == "doc":
content_to_parse = doc["last_viewed_panel"]["content"]
logger.debug(f"找到文档内容可解析: {title}")
if not content_to_parse:
logger.warning(f"跳过文档'{title}' (ID: {doc_id}) - 在'last_viewed_panel'中未找到合适的内容")
continue
try:
logger.debug(f"将文档转换为markdown: {title}")
markdown_content = convert_prosemirror_to_markdown(content_to_parse)
# 为元数据添加frontmatter块
frontmatter = f"---\n"
frontmatter += f"granola_id: {doc_id}\n"
escaped_title_for_yaml = title.replace('"', '\\"')
frontmatter += f'title: "{escaped_title_for_yaml}"\n'
if doc.get("created_at"):
frontmatter += f"created_at: {doc.get('created_at')}\n"
if doc.get("updated_at"):
frontmatter += f"updated_at: {doc.get('updated_at')}\n"
frontmatter += f"---\n\n"
final_markdown = frontmatter + markdown_content
filename = sanitize_filename(title) + ".md"
filepath = output_path / filename
logger.debug(f"写入文件到: {filepath}")
with open(filepath, 'w', encoding='utf-8') as f:
f.write(final_markdown)
logger.info(f"成功保存: {filepath}")
synced_count += 1
except Exception as e:
logger.error(f"处理文档'{title}'时出错 (ID: {doc_id}): {str(e)}")
logger.debug("完整回溯:", exc_info=True)
logger.info(f"同步完成。{synced_count}个笔记已保存到'{output_path}'")
if __name__ == "__main__":
main()
|