# 这段代码演示了如何使用 Apache IoTDB 的 Python 客户端来管理时序数据。
# 包括创建和删除存储组、创建和删除时间序列、插入数据以及查询数据等操作。
import time
# 导入依赖库
# 初始化 IoTDB 会话
from iotdb.Session import Session
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
# 创建会话连接
ip = "192.168.19.162"
port_ = "6667"
username_ = "root"
password_ = "root"
# 使用单个节点建立会话
# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8", enable_redirection=True)
# 使用多个节点建立会话
session = Session.init_from_node_urls(
node_urls=["192.168.19.162:6667"],
user="root",
password="root",
fetch_size=1024,
zone_id="UTC+8",
enable_redirection=True,
)
session.open(False)
# 向数据库插入一条记录
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
timestamp = int(time.time() * 1000)
session.insert_record("root.sg_test_01.d_01",timestamp , measurements_, data_types_, values_)
# 关闭会话连接
session.close()
print("运行成功 All executions done!!")
# 这段代码演示了如何使用 Apache IoTDB 的 Python 客户端来管理时序数据。
# 包括创建和删除存储组、创建和删除时间序列、插入数据以及查询数据等操作。
# 导入依赖库
import numpy as np
# 初始化 IoTDB 会话
from iotdb.Session import Session
from iotdb.template.MeasurementNode import MeasurementNode
from iotdb.template.Template import Template
from iotdb.utils.BitMap import BitMap
from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor
from iotdb.utils.Tablet import Tablet
from iotdb.utils.NumpyTablet import NumpyTablet
# 创建会话连接
ip = "127.0.0.1"
port_ = "6667"
username_ = "root"
password_ = "root"
# 使用单个节点建立会话
# session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8", enable_redirection=True)
# 使用多个节点建立会话
session = Session.init_from_node_urls(
node_urls=["127.0.0.1:6667", "127.0.0.1:6668", "127.0.0.1:6669"],
user="root",
password="root",
fetch_size=1024,
zone_id="UTC+8",
enable_redirection=True,
)
session.open(False)
# 创建和删除数据库
session.set_storage_group("root.sg_test_01")
session.set_storage_group("root.sg_test_02")
session.set_storage_group("root.sg_test_03")
session.set_storage_group("root.sg_test_04")
session.delete_storage_group("root.sg_test_02")
session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"])
# 设置时间序列
session.create_time_series(
"root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY
)
session.create_time_series(
"root.sg_test_01.d_02.s_01",
TSDataType.BOOLEAN,
TSEncoding.PLAIN,
Compressor.SNAPPY,
None,
{"tag1": "v1"},
{"description": "v1"},
"temperature",
)
# 批量设置时间序列
ts_path_lst_ = [
"root.sg_test_01.d_01.s_04",
"root.sg_test_01.d_01.s_05",
"root.sg_test_01.d_01.s_06",
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
session.create_multi_time_series(
ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_
)
ts_path_lst_ = [
"root.sg_test_01.d_02.s_04",
"root.sg_test_01.d_02.s_05",
"root.sg_test_01.d_02.s_06",
"root.sg_test_01.d_02.s_07",
"root.sg_test_01.d_02.s_08",
"root.sg_test_01.d_02.s_09",
]
data_type_lst_ = [
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))]
compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))]
tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))]
attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))]
session.create_multi_time_series(
ts_path_lst_,
data_type_lst_,
encoding_lst_,
compressor_lst_,
None,
tags_lst_,
attributes_lst_,
None,
)
# 删除时间序列
session.delete_time_series(
[
"root.sg_test_01.d_01.s_07",
"root.sg_test_01.d_01.s_08",
"root.sg_test_01.d_01.s_09",
]
)
# 检查时间序列是否存在
print(
"s_07 expecting False, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_01.s_07"),
)
print(
"s_03 expecting True, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_01.s_03"),
)
print(
"d_02.s_01 expecting True, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_02.s_01"),
)
print(
"d_02.s_06 expecting True, checking result: ",
session.check_time_series_exists("root.sg_test_01.d_02.s_06"),
)
# 向数据库插入一条记录
measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"]
values_ = [False, 10, 11, 1.1, 10011.1, "test_record"]
data_types_ = [
TSDataType.BOOLEAN,
TSDataType.INT32,
TSDataType.INT64,
TSDataType.FLOAT,
TSDataType.DOUBLE,
TSDataType.TEXT,
]
session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_)
# 批量插入记录到数据库
measurements_list_ = [
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"],
]
values_list_ = [
[False, 22, 33, 4.4, 55.1, "test_records01"],
[True, 77, 88, 1.25, 8.125, bytes("test_records02", "utf-8")],
]
data_type_list_ = [data_types_, data_types_]
device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"]
session.insert_records(
device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_
)
# 向数据库插入一个数据块(Tablet)
values_ = [
[False, 10, 11, 1.1, 10011.1, "test01"],
[True, 100, 11111, 1.25, 101.0, "test02"],
[False, 100, 1, 188.1, 688.25, "test03"],
[True, 0, 0, 0, 6.25, "test04"],
]
timestamps_ = [4, 5, 6, 7]
tablet_ = Tablet(
"root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_
)
session.insert_tablet(tablet_)
# 向数据库插入一个 Numpy 数据块(NumpyTablet)
np_values_ = [
np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()),
np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()),
np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()),
np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()),
np.array([10011.1, 101.0, 688.25, 6.25], TSDataType.DOUBLE.np_dtype()),
np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()),
]
np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype())
np_tablet_ = NumpyTablet(
"root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_
)
session.insert_tablet(np_tablet_)
# 关闭会话连接
session.close()
print("All executions done!!")
发表评论