赞
踩
从采集到存储:时序数据库到底怎么处理时间?
iotdb官方文档手册
可以使用docker volume create命令创建 docker 卷。此命令将在/var/lib/docker/volumes目录中创建一个卷。
docker search iotdb docker pull apache/iotdb (1)创建数据文件和日志的 docker 挂载目录 docker volume create mydata docker volume create mylogs (2)运行docker容器 docker run --name iotdb -p 6667:6667 -v mydata:/iotdb/data -v mylogs:/iotdb/logs -d apache/iotdb:latest /iotdb/bin/start-server.sh (3)可以使用docker ps来检查是否运行成功 (4)获取container的ID docker container ls (5)进入容器 docker exec -it iotdb /bin/bash (6)登陆IotDB /iotdb/sbin/start-cli.sh -h localhost -p 6667 -u root -pw root
要求Python >=3.7
pip3 install apache-iotdb
from iotdb.Session import Session
ip = "192.168.43.212"
port_= "6667"
username_ = "root"
password_ = "root"
session = Session(ip,port_,username_,password_)
session.open(enable_rpc_compression=False)
zone = session.get_time_zone()
session.close()
设置存储组
session.set_storage_group(group_name)
删除一个或多个存储组
session.delete_storage_group(group_name)
session.delete_storage_groups(group_name_lst)
命令行操作
IoTDB> SET STORAGE GROUP TO root.in
创建一个或多个时间序列 session.create_time_series(ts_path, data_type, encoding, compressor, props=None, tags=None, attributes=None, alias=None) session.create_multi_time_series( ts_path_lst, data_type_lst, encoding_lst, compressor_lst, props_lst=None, tags_lst=None, attributes_lst=None, alias_lst=None ) 创建对齐的时间序列 session.create_aligned_time_series( device_id, measurements_lst, data_type_lst, encoding_lst, compressor_lst ) 删除一个或多个时间序列 session.delete_time_series(paths_list) 核查指定的时间序列是否存在 session.check_time_series_exists(path)
命令行
IoTDB> CREATE TIMESERIES root.in.wf01.wt01.status WITH DATATYPE=BOOLEAN, ENCODING=PLAIN
IoTDB> CREATE TIMESERIES root.in.wf01.wt01.temperature WITH DATATYPE=FLOAT, ENCODING=RLE
IoTDB> show timeseries
参数说明
create single time series
:param ts_path: String, complete time series path (starts from root)
:param data_type: TSDataType, data type for this time series
:param encoding: TSEncoding, encoding for this time series
:param compressor: Compressor, compressing type for this time series
:param props: Dictionary, properties for time series
:param tags: Dictionary, tag map for time series
:param attributes: Dictionary, attribute map for time series
:param alias: String, measurement alias for time series
from iotdb.Session import Session
# creating session connection.
ip = "192.168.124.26"
port_ = "6667"
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
print(session.get_time_zone())
session.open(False)
session.close()
from iotdb.Session import Session # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # set and delete storage groups session.set_storage_group("root.sg_test_01") session.set_storage_group("root.sg_test_02") session.set_storage_group("root.sg_test_03") session.set_storage_group("root.sg_test_04") session.delete_storage_group("root.sg_test_02") session.delete_storage_groups(["root.sg_test_03", "root.sg_test_04"]) session.close()
命令行操作
IoTDB> show storage group
from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # set storage groups session.set_storage_group("root.sg_test_01") # setting time series. session.create_time_series( "root.sg_test_01.d_01.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY) session.create_time_series( "root.sg_test_01.d_01.s_02", TSDataType.INT32, TSEncoding.PLAIN, Compressor.SNAPPY) session.create_time_series( "root.sg_test_01.d_01.s_03", TSDataType.INT64, TSEncoding.PLAIN, Compressor.SNAPPY) session.create_time_series( "root.sg_test_01.d_02.s_01", TSDataType.BOOLEAN, TSEncoding.PLAIN, Compressor.SNAPPY, None, {"tag1": "v1"}, {"description": "v1"}, "temperature", ) session.close()
from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # set storage groups session.set_storage_group("root.sg_test_01") # setting multiple time series once. ts_path_lst_ = [ "root.sg_test_01.d_01.s_04", "root.sg_test_01.d_01.s_05", "root.sg_test_01.d_01.s_06", "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09", ] data_type_lst_ = [ TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] session.create_multi_time_series( ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_ ) session.close()
from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # set storage groups session.set_storage_group("root.sg_test_01") # setting multiple time series once. ts_path_lst_ = [ "root.sg_test_01.d_02.s_04", "root.sg_test_01.d_02.s_05", "root.sg_test_01.d_02.s_06", "root.sg_test_01.d_02.s_07", "root.sg_test_01.d_02.s_08", "root.sg_test_01.d_02.s_09", ] data_type_lst_ = [ TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] encoding_lst_ = [TSEncoding.PLAIN for _ in range(len(data_type_lst_))] compressor_lst_ = [Compressor.SNAPPY for _ in range(len(data_type_lst_))] tags_lst_ = [{"tag2": "v2"} for _ in range(len(data_type_lst_))] attributes_lst_ = [{"description": "v2"} for _ in range(len(data_type_lst_))] session.create_multi_time_series( ts_path_lst_, data_type_lst_, encoding_lst_, compressor_lst_, None, tags_lst_, attributes_lst_, None, ) session.close()
from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # delete time series session.delete_time_series( [ "root.sg_test_01.d_01.s_07", "root.sg_test_01.d_01.s_08", "root.sg_test_01.d_01.s_09", ] ) session.close()
from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # checking time series print( "s_07 expecting False, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_07"), ) print( "s_03 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_01.s_03"), ) print( "d_02.s_01 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_02.s_01"), ) print( "d_02.s_06 expecting True, checking result: ", session.check_time_series_exists("root.sg_test_01.d_02.s_06"), ) session.close()
from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # insert one record into the database. measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] values_ = [False, 10, 11, 1.1, 10011.1, "test_record"] data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] session.insert_record("root.sg_test_01.d_01", 1, measurements_, data_types_, values_) session.close()
命令行
IoTDB> select * from root.sg_test_01.d_01
# insert multiple records into database from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) measurements_list_ = [ ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"], ] values_list_ = [ [False, 22, 33, 4.4, 55.1, "test_records01"], [True, 77, 88, 1.25, 8.125, "test_records02"], ] data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] data_type_list_ = [data_types_, data_types_] device_ids_ = ["root.sg_test_01.d_01", "root.sg_test_01.d_01"] session.insert_records( device_ids_, [2, 3], measurements_list_, data_type_list_, values_list_ ) session.close()
# insert multiple records into database from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor # creating session connection. from iotdb.utils.Tablet import Tablet ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] # insert one tablet into the database. values_ = [ [False, 10, 11, 1.1, 10011.1, "test01"], [True, 100, 11111, 1.25, 101.0, "test02"], [False, 100, 1, 188.1, 688.25, "test03"], [True, 0, 0, 0, 6.25, "test04"], ] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. timestamps_ = [4, 5, 6, 7] tablet_ = Tablet( "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ ) session.insert_tablet(tablet_) session.close()
# insert multiple records into database from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor import numpy as np # creating session connection. from iotdb.utils.NumpyTablet import NumpyTablet from iotdb.utils.Tablet import Tablet ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] # insert one numpy tablet into the database. np_values_ = [ np.array([False, True, False, True], TSDataType.BOOLEAN.np_dtype()), np.array([10, 100, 100, 0], TSDataType.INT32.np_dtype()), np.array([11, 11111, 1, 0], TSDataType.INT64.np_dtype()), np.array([1.1, 1.25, 188.1, 0], TSDataType.FLOAT.np_dtype()), np.array([10011.1, 101.0, 689.25, 6.25], TSDataType.DOUBLE.np_dtype()), np.array(["test01", "test02", "test03", "test04"], TSDataType.TEXT.np_dtype()), ] np_timestamps_ = np.array([1, 2, 3, 4], TSDataType.INT64.np_dtype()) np_tablet_ = NumpyTablet( "root.sg_test_01.d_02", measurements_, data_types_, np_values_, np_timestamps_ ) session.insert_tablet(np_tablet_) session.close()
# insert multiple records into database from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor import numpy as np # creating session connection. from iotdb.utils.NumpyTablet import NumpyTablet from iotdb.utils.Tablet import Tablet ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] # insert one unsorted numpy tablet into the database. np_values_unsorted = [ np.array([False, False, False, True, True], np.dtype(">?")), np.array([0, 10, 100, 1000, 10000], np.dtype(">i4")), np.array([1, 11, 111, 1111, 11111], np.dtype(">i8")), np.array([1.1, 1.25, 188.1, 0, 8.999], np.dtype(">f4")), np.array([10011.1, 101.0, 688.25, 6.25, 8, 776], np.dtype(">f8")), np.array(["test09", "test08", "test07", "test06", "test05"]), ] np_timestamps_unsorted = np.array([9, 8, 7, 6, 5], np.dtype(">i8")) np_tablet_unsorted = NumpyTablet( "root.sg_test_01.d_02", measurements_, data_types_, np_values_unsorted, np_timestamps_unsorted, ) session.insert_tablet(np_tablet_unsorted) print(np_tablet_unsorted.get_timestamps()) for value in np_tablet_unsorted.get_values(): print(value)
# insert multiple records into database from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor import numpy as np # creating session connection. from iotdb.utils.NumpyTablet import NumpyTablet from iotdb.utils.Tablet import Tablet ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] values_ = [ [False, 10, 11, 1.1, 10011.1, "test01"], [True, 100, 11111, 1.25, 101.0, "test02"], [False, 100, 1, 188.1, 688.25, "test03"], [True, 0, 0, 0, 6.25, "test04"], ] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. # insert multiple tablets into database tablet_01 = Tablet( "root.sg_test_01.d_01", measurements_, data_types_, values_, [8, 9, 10, 11] ) tablet_02 = Tablet( "root.sg_test_01.d_01", measurements_, data_types_, values_, [12, 13, 14, 15] ) session.insert_tablets([tablet_01, tablet_02]) session.close()
空值会自动补。
# insert multiple records into database from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor import numpy as np # creating session connection. from iotdb.utils.NumpyTablet import NumpyTablet from iotdb.utils.Tablet import Tablet ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) measurements_ = ["s_01", "s_02", "s_03", "s_04", "s_05", "s_06"] data_types_ = [ TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64, TSDataType.FLOAT, TSDataType.DOUBLE, TSDataType.TEXT, ] # insert one tablet with empty cells into the database. values_ = [ [None, 10, 11, 1.1, 10011.1, "test01"], [True, None, 11111, 1.25, 101.0, "test02"], [False, 100, 1, None, 688.25, "test03"], [True, 0, 0, 0, 6.25, None], ] # Non-ASCII text will cause error since bytes can only hold 0-128 nums. timestamps_ = [16, 17, 18, 19] tablet_ = Tablet( "root.sg_test_01.d_01", measurements_, data_types_, values_, timestamps_ ) session.insert_tablet(tablet_) session.close()
会覆盖以前的数据
# insert multiple records into database from iotdb.Session import Session from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor import numpy as np # creating session connection. from iotdb.utils.NumpyTablet import NumpyTablet from iotdb.utils.Tablet import Tablet ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # insert records of one device time_list = [1, 2, 3] measurements_list = [ ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ["s_01", "s_02", "s_03"], ] data_types_list = [ [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], [TSDataType.BOOLEAN, TSDataType.INT32, TSDataType.INT64], ] values_list = [[False, 22, 33], [True, 1, 23], [False, 15, 26]] session.insert_records_of_one_device( "root.sg_test_01.d_01", time_list, measurements_list, data_types_list, values_list ) session.close()
from iotdb.Session import Session
ip = "192.168.124.26"
port_ = "6667"
username_ = "root"
password_ = "root"
session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8")
print(session.get_time_zone())
session.open(False)
# execute non-query sql statement
session.execute_non_query_statement(
"insert into root.sg_test_01.d_01(timestamp, s_02) values(16, 188)"
)
session.close()
from iotdb.Session import Session ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # execute sql query statement #sql_str = "select * from root.sg_test_01.d_01" sql_str = "select s_01, s_02, s_03, s_04, s_05, s_06 from root.sg_test_01.d_02" with session.execute_query_statement(sql_str) as session_data_set: session_data_set.set_fetch_size(1024) while session_data_set.has_next(): print(session_data_set.next()) session.close()
from iotdb.Session import Session import pandas as pd ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024, zone_id="UTC+8") print(session.get_time_zone()) session.open(False) # execute sql query statement sql_str = "select * from root.sg_test_01.d_01" with session.execute_query_statement(sql_str) as session_data_set: df = session_data_set.todf() session.close()
写入后时间戳会减去8小时,读取后正常。
from iotdb.Session import Session # creating session connection. from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor import time from datetime import datetime # 字符串时间戳转换为整数时间戳 def tostamp_int(timestr): datetime_obj = datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S.%f") obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) return obj_stamp ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024) print(session.get_time_zone()) session.open(False) devid = "667601E99" pointid = "01" kpiid = "环境温度" timestr1 = "2022-08-31 13:18:55.786" timestr2 = "2022-08-31 13:18:56.505" ts_path = "root.main.{}.{}.{}".format(devid,pointid,kpiid) # 创建存储组 session.set_storage_group("root.main") # 创建时间序列 session.create_time_series(ts_path, TSDataType.FLOAT, TSEncoding.PLAIN, Compressor.SNAPPY) # 插入多条记录 measurements_list_ = [ ["环境温度"], ["环境温度"], ] values_list_ = [ [35.6], [23.8], ] data_types_ = [TSDataType.FLOAT] data_type_list_ = [data_types_, data_types_] device_ids_ = ["root.main.667601E99.01", "root.main.667601E99.01"] session.insert_records( device_ids_, [tostamp_int(timestr1), tostamp_int(timestr2)], measurements_list_, data_type_list_, values_list_ ) print(tostamp_int(timestr1),tostamp_int(timestr2)) # execute sql query statement sql_str = "select * from root.main" with session.execute_query_statement(sql_str) as session_data_set: df = session_data_set.todf() print(df) session.close()
from iotdb.Session import Session # creating session connection. from iotdb.utils.IoTDBConstants import TSDataType, TSEncoding, Compressor import time from datetime import datetime # 字符串时间戳转换为整数时间戳 def tostamp_int(timestr): datetime_obj = datetime.strptime(timestr, "%Y-%m-%d %H:%M:%S.%f") obj_stamp = int(time.mktime(datetime_obj.timetuple()) * 1000.0 + datetime_obj.microsecond / 1000.0) return obj_stamp ip = "192.168.124.26" port_ = "6667" username_ = "root" password_ = "root" session = Session(ip, port_, username_, password_, fetch_size=1024) print(session.get_time_zone()) session.open(False) devid = "667601E99" pointid = "01" kpiid = "环境温度" timestr1 = "2022-08-31 14:18:55.786" ts_path = "root.main.{}.{}.{}".format(devid,pointid,kpiid) # 创建存储组 session.set_storage_group("root.main") # 创建时间序列 session.create_time_series(ts_path, TSDataType.FLOAT, TSEncoding.PLAIN, Compressor.SNAPPY) # execute no sql query statement sql_str = "insert into root.main.667601E99.01(timestamp,{}) values({},{})".format(kpiid,tostamp_int(timestr1),67.8) print(sql_str) session.execute_non_query_statement(sql_str) # execute sql query statement sql_str = "select * from root.main" with session.execute_query_statement(sql_str) as session_data_set: df = session_data_set.todf() print(df) session.close()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。