写在前面的话
工作算啥。昨天下午,我正在老妈卧室搞东西。二宝逮住机会,使劲磨我,我好几次说,你去打僵尸或看ipad,要不然我打你了。好几次无效,我失去了耐心,卡卡打了二宝,孩子哇哇哭。老妈把孩子领客厅了,孩子哭声哄不下,媳妇把孩子领卧室了,哄了好一会孩子才委屈的睡去。在此期间,我备受煎熬,痛苦的要命,好想扇自己嘴巴子,为啥要打孩子,二宝四岁了,这是第二次打他。我发现啥也干不了了,一支一支的抽烟,想想,工作算个啥,工作比孩子还重要吗,今天不做了又会怎么样呢,把孩子打的吱哇乱叫,值得吗?最后得出的结论是:孩子重要。即使这个项目黄了,挣的钱少了,给孩子吃普通的饭菜就可以,再找下个项目呗。如果确实有急活,可以去图书馆啊,为什么要打孩子呢。以后的底线是不打孩子,工作做不下去就停止或换地方,此记!
[227+100]-------底部有张生活照
(头条号运营:大家想全托管上号的联系我哦,每天让你得个早餐钱,wx: qhz198607)
【关键词】:python、ragflow、es、数据并发
一、es数据并发同步(三级)
描述:现在想着es能并发同步,现在还没做到,想法搞一搞。
开工:
第一步:传参数(四级)
20250410周四时间段:16:43-17:00
现在插入一批成功需要4分钟,截图如下:
图1a-1
注:这个需要优化。先看下,现在es的性能。
第二步:es性能(四级)
20250410周四时间段:20:20-21:00
查下资料,先查下分片上的文档数量,如下:
//查下旧索引每个分片上的数据量
GET /ragflow_7d19a176807611efb0f80242ac120006/_search?preference=_shards:0
{
"size": 0,
"track_total_hits": true
}
注:运行截图如下:
图1a-2
注:接下来,查一下,这个分片上,_sync_source2=1的数据量是多少。
第三步:_sync_source2=1(四级)
20250410周四时间段:20:38-21:00
写程序如下:
GET /ragflow_7d19a176807611efb0f80242ac120006/_search?preference=_shards:0
{
"query": {
"term": {
"_sync_source2": 1
}
},
"size": 0,
"track_total_hits": true
}
注:运行结果如下:
图1a-3
注:接下来,分两个片跑数据,这样会快点。
接下来,修改同步脚本。
第四步:分片跑数据(四级)
20250410周四时间段:23:08-00:00
先把脚本迁移一下,迁移到龙哥指定的位置,截图如下:
图1a-4
注:接下来,改下文件,跑下试试。
第五步:改文件到脚本(四级)
20250410周四时间段:23:13-00:00
添加脚本如下:
图1a-5
注:这个是能跑起来的,先这样。接下来,做分片查询。
按分片同步,这样能快点。
第六片:分片数据查询(四级)
20250410周四时间段:23:32-00:00
程序如下:
query = {
"query": {
"bool": {
"must_not": {
"exists": {"field": "_sync_source2"}
}
}
},
"size": batch_size
}
resp = es.options(request_timeout=120).search(index=old_index,
body=query, scroll=scroll_time,preference=f"_shards:{shard_id}" ) 注:这样写就能按片搜索,效果还可以。接下来,运行报错处理一下。
二、es同步数据报错处理(三级)
描述:现在运行es数据同步脚本,报错了,如下:
图1b-1
注:把这个错误处理一下。
开工:
第一步:处理错误(四级)
20250411周五时间段:10:24-11:00
错误文字版如下:
2025-04-11 09:35:24.159 | ERROR | es_sync0.py:migrate_data:150 - 迁移失败:
2025-04-11 09:35:24.159 | ERROR | es_sync0.py:migrate_data:158 - 全局错误: Working outside of application context.
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
Traceback (most recent call last):
File "/home/admin/python_projects/ragflow_pub/rag/svr/es_sync0.py", line 165, in <module>
migrate_data()
^^^^^^^^^^^^^^
File "/home/admin/python_projects/ragflow_pub/rag/svr/es_sync0.py", line 159, in migrate_data
return jsonify({
^^^^^^^^^
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/flask/json/__init__.py", line 170, in jsonify
return current_app.json.response(*args, **kwargs) # type: ignore[return-value]
^^^^^^^^^^^^^^^^
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/werkzeug/local.py", line 318, in __get__
obj = instance._get_current_object()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/werkzeug/local.py", line 519, in _get_current_object
raise RuntimeError(unbound_message) from None
RuntimeError: Working outside of application context.
This typically means that you attempted to use functionality that needed
the current application. To solve this, set up an application context
with app.app_context(). See the documentation for more information.
先查下资料,原因是:
【没有使用脚本那一套】
解决方案如下:
调用方法时,如下写:
from flask import Flask
app = Flask(__name__)
with app.app_context():
migrate_data()
注:再次运行报错,如下:
admin@zero4 ragflow_pub/rag/svr git:(v0.16.0-b [!]) ➜ python es_sync0.py
OpenBLAS blas_thread_init: pthread_create failed for thread 21 of 24: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 256312 current, 256312 max
OpenBLAS blas_thread_init: pthread_create failed for thread 22 of 24: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 256312 current, 256312 max
OpenBLAS blas_thread_init: pthread_create failed for thread 23 of 24: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 256312 current, 256312 max
Traceback (most recent call last):
File "/home/admin/python_projects/ragflow_pub/rag/svr/es_sync0.py", line 5, in <module>
from rag.settings import log
File "/home/admin/python_projects/ragflow_pub/rag/__init__.py", line 19, in <module>
import xgboost
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/xgboost/__init__.py", line 9, in <module>
from .core import DMatrix, DeviceQuantileDMatrix, Booster, DataIter
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/xgboost/core.py", line 20, in <module>
import numpy as np
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/numpy/__init__.py", line 130, in <module>
from numpy.__config__ import show as show_config
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/numpy/__config__.py", line 4, in <module>
from numpy.core._multiarray_umath import (
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/numpy/core/__init__.py", line 24, in <module>
from . import multiarray
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/numpy/core/multiarray.py", line 10, in <module>
from . import overrides
File "/home/admin/python_projects/ragflow_pub/.venv/lib/python3.12/site-packages/numpy/core/overrides.py", line 8, in <module>
from numpy.core._multiarray_umath import (
File "<frozen importlib._bootstrap>", line 463, in _lock_unlock_module
KeyboardInterrupt
注:处理一下。
第二步:处理thread(四级)
20250411周五时间段:10:37-11:00
解决方案如下:
OpenBLAS 线程初始化错误
出现的警告信息:
OpenBLAS blas_thread_init: pthread_create failed for thread 21 of 24: Resource temporarily unavailable
OpenBLAS blas_thread_init: RLIMIT_NPROC 256312 current, 256312 max
原因:
解决方案:
# 临时解决方案:限制OpenBLAS使用的线程数
export OPENBLAS_NUM_THREADS=4
export OMP_NUM_THREADS=4
# 或者更激进地限制为单线程
export OPENBLAS_NUM_THREADS=1
export OMP_NUM_THREADS=1
注:运行时,截图如下:
图1b-2
注:把这两条写到代码的注释里,下次好找。
接下来,对数据进行分类,可以分布跑,还有,在跑脚本时,怎么传个变量进去,这样就可以写一个脚本了。
第三步:脚本传参数(四级)
20250411周五时间段:11:18-12:00
运行时 python es_sync0.py,运行脚本时,怎么传一个参数shard_id进去,以同步不同片上的数据,查询如下:
要为脚本添加 shard_id 参数输入功能,可以通过以下几种方式实现:
方法1:使用命令行参数(推荐)
import argparse # 在文件开头添加
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--shard_id", type=int, default=0, help="Elasticsearch shard ID to process")
args = parser.parse_args()
migrate_data(args.shard_id) # 修改函数签名接收参数
if __name__ == "__main__":
main()
注:用这个方式进行修改。
第四步:索引数据分类(四级)
三、头条号战果汇报
昨日数据来啦,累计总收入:6339.4,昨日总收入:180.1 ,昨日总播放:244.1万,可提现总金额:1715.2,软件截图如下:
图1c-1
注:想要全脱管运营头条号的联系我,你出账号,我来运营,收益四六分成(你六我四),你当甩手掌柜,每天都能得几块零花钱,财富wx: 17701328814,也可以加群先了解一下。
图1c-2
四、生活照片
拍摄于2025年12月24日,22:04:06,带二宝在家里玩,当时二宝三岁两个月。昨天晚上时,二宝醒了,我给二宝说:宝,不管你听懂听不懂,爸爸下午打你,爸爸错了,向你道歉,我发誓在你长到18岁之前,我不再打你一下。二宝说:爸你说错了,我叫诚诚。我说:对对,诚诚,爸爸说到做到。两个儿子,钱梁,钱诚。大宝小时候打了之后,说不打他了,二宝这次也是,以后不打了。孩子是拿来爱的,不是打的,打孩子只能说明自己的无能,但爱孩子也要有个度,不能向老爸一样,为了不让孩子受苦,干脆不出去打工了,天天陪孩子。我要在老爸的版本上做升级,在家就爱孩子,感觉孩子打扰的我工作不下去,我就去图书馆或咖啡厅,不能因为工作打孩子,但工作必须继续,加油!
图1d-1
《本文完》