gRPC Python:lectura y escritura de datos de proceso
Este artículo describe cómo acceder y escribir datos de procesos simples con Python con un AXC F 3152 utilizando gRPC. (https://www.plcnext.help/te/Service_Components/gRPC_Introduction.htm)
Requisito previo
Primero tenemos que preparar los archivos necesarios, fuera del PLC, por ejemplo, en una máquina con Windows.
- Instalar Python 3.9 (3.10 puede causar errores)
- Instale el paquete de Python necesario para generar código a partir de los archivos .proto:
pip install grpcio-tools==1.36.1
- Descargue y descomprima el repositorio que contiene los archivos .proto desde https://github.com/PLCnext/gRPC
Generar _pb2.py y _pb2_grpc.py a partir de archivos .proto
A continuación, debemos generar los archivos python necesarios a partir de los archivos .proto proporcionados. Estos últimos se encuentran en la siguiente carpeta:gRPC-master/protobuf.
Utilice este código para crear una secuencia de comandos de Python en la carpeta gRPC-master, por ejemplo, generate_grpc.py. El guión
- genera los archivos necesarios y los coloca en gRPC-master/pxc_grpc
- adaptar las rutas de importación
import glob
import os
from pathlib import Path
# create the output directory
Path('pxc_grpc').mkdir(parents=True, exist_ok=True)
grpc_command_base = 'python -m grpc_tools.protoc -I./protobuf --python_out=pxc_grpc --grpc_python_out=pxc_grpc '
import_paths = set()
# generate the *_pb2.py and *_pb2_grpc.py files
for filename in glob.iglob('./protobuf/**', recursive=True):
if filename.endswith('.proto'):
# store the import path
path_parts = filename.split(os.sep)
import_paths.add('.'.join(path_parts[1:-1]))
grpc_command = ''.join([grpc_command_base, os.path.join('.', os.path.relpath(filename))])
stream = os.popen(grpc_command)
output = stream.read()
if output != '':
print(''.join(['error/info for file ', os.path.relpath(filename), ' - ', output]))
# get the python files in the base directory
base_pys = set()
for (dirpath, dirnames, filenames) in os.walk('./pxc_grpc'):
for f in filenames:
base_pys.add(f.split('.py')[0])
break
# reformat the stored paths to adapt the import statements
try:
import_paths.remove('')
except:
pass
import_paths = list(import_paths)
import_paths.sort(key=len)
import_paths.reverse()
# adapt the imports
for filename in glob.iglob('./pxc_grpc/**', recursive=True):
if filename.endswith('.py'):
new_lines = []
with open(filename, 'r') as file:
lines = file.readlines()
for line in lines:
if line.startswith('from'):
for import_path in import_paths:
if import_path in line:
line = line.replace(import_path, ''.join(['pxc_grpc.', import_path]), 1)
break
elif line.startswith('import'):
parts = line.split()
if parts[1] in base_pys:
line = line.replace('import', 'from pxc_grpc import')
new_lines.append(line)
with open(filename, 'w') as file:
file.write(''.join(new_lines))
Abra un shell y ejecute el script:pyton generate_grpc.py
Cree un proyecto de demostración de PLCnext
El proyecto que se muestra solo debe demostrar cómo la interfaz de gRPC interactúa con el GDS. Siéntase libre de usar un proyecto existente en su lugar. Para un proyecto individual, debe editar los nombres de los puertos en la siguiente secuencia de comandos de Python en consecuencia, por ejemplo, Arp.Plc.Eclr/MainInstance.strInput
.
Preparar el PLC
Instale pip para administrar sus paquetes de Python:
- Conecte el controlador AXC F 3152 a Internet.
- Ingrese el comando curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py.
- Luego ingrese el comando python3 get-pip.py.
Instale los paquetes necesarios:pip install grpcio protobuf==3.20.0
Cree una carpeta 'grpc_test' en el directorio de proyectos (/opt/plcnext/projects/).
Copie la carpeta 'pxc_grpc', que contiene los archivos Python embalados en 'grpc_test', por ejemplo, con WinSCP.
Cree una secuencia de comandos de Python en la carpeta 'grpc_test' llamada 'grpc_test.py' e inserte el siguiente código:
import grpc
from pxc_grpc.Plc.Gds.IDataAccessService_pb2 import IDataAccessServiceReadSingleRequest, \
IDataAccessServiceReadRequest, IDataAccessServiceWriteSingleRequest, IDataAccessServiceWriteRequest
from pxc_grpc.Plc.Gds.IDataAccessService_pb2_grpc import IDataAccessServiceStub
from pxc_grpc.Plc.Gds.WriteItem_pb2 import WriteItem
def write_single_string(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 19
single_write_request.data.Value.StringValue = value
return stub.WriteSingle(single_write_request)
def write_single_int(stub, port_name, value):
single_write_request = IDataAccessServiceWriteSingleRequest()
single_write_request.data.PortName = port_name
single_write_request.data.Value.TypeCode = 6
single_write_request.data.Value.Int16Value = value
return stub.WriteSingle(single_write_request)
def write_multiple_values(stub):
write_request = IDataAccessServiceWriteRequest()
wi1 = WriteItem()
wi1.PortName = 'Arp.Plc.Eclr/MainInstance.strInput'
wi1.Value.StringValue = "test1"
wi1.Value.TypeCode = 19
wi2 = WriteItem()
wi2.PortName = 'Arp.Plc.Eclr/MainInstance.strInput2'
wi2.Value.StringValue = "test2"
wi2.Value.TypeCode = 19
# add multiple WriteItems at once
write_request.data.extend([wi1, wi2])
# add WriteItems separately
# response1.data.append(wi1)
# response1.data.append(wi2)
return stub.Write(write_request)
def read_single_value(stub, port_name):
single_read_request = IDataAccessServiceReadSingleRequest()
single_read_request.portName=port_name
return stub.ReadSingle(single_read_request)
def read_multiple_values(stub, port_names):
read_request = IDataAccessServiceReadRequest()
read_request.portNames.extend(port_names)
return stub.Read(read_request)
if __name__ == "__main__":
# create channel and stub
channel = grpc.insecure_channel('unix:/run/plcnext/grpc.sock')
stub = IDataAccessServiceStub(channel)
print(write_single_string(stub, 'Arp.Plc.Eclr/MainInstance.strInput', 'test123'))
print(write_single_int(stub, 'Arp.Plc.Eclr/MainInstance.iInput', 18))
print(write_multiple_values(stub))
r = read_single_value(stub, 'Arp.Plc.Eclr/MainInstance.strInput')
print(r)
print(r._ReturnValue.Value.TypeCode)
print(r._ReturnValue.Value.StringValue)
r = read_multiple_values(stub, ['Arp.Plc.Eclr/MainInstance.iInput', 'Arp.Plc.Eclr/MainInstance.strInput'])
for value in r._ReturnValue:
print(value, value.Value.TypeCode)
Conecte su PLC a PLCnext Engineer, descargue el proyecto e inicie la vista en vivo.
Ejecutar el ejemplo
Ahora comienza el ejemplo. Inicie sesión en el PLC a través de ssh y navegue hasta 'grpc_test', luego inicie el script de Python:
cd projects/grpc_test/
python3 grpc_test.py
El gRPC permite la interacción con variables GDS.
Tipos de datos
Para leer y escribir variables, se requiere el tipo de datos, por ejemplo, wi1.Value.TypeCode = 19
. Los tipos se describen en el archivo generado gRPC-master/pxc_grpc/ArpTypes_pb2.py
comenzando en la línea 242:
CT_None = 0
CT_End = 0
CT_Void = 1
CT_Boolean = 2
CT_Char = 3
CT_Int8 = 4
CT_Uint8 = 5
CT_Int16 = 6
CT_Uint16 = 7
CT_Int32 = 8
CT_Uint32 = 9
CT_Int64 = 10
CT_Uint64 = 11
CT_Real32 = 12
CT_Real64 = 13
CT_Struct = 18
CT_String = 19
CT_Utf8String = 19
CT_Array = 20
CT_DateTime = 23
CT_Version = 24
CT_Guid = 25
CT_AnsiString = 26
CT_Object = 28
CT_Utf16String = 30
CT_Stream = 34
CT_Enumerator = 35
CT_SecureString = 36
CT_Enum = 37
CT_Dictionary = 38
CT_SecurityToken = 39
CT_Exception = 40
CT_IecTime = 41
CT_IecTime64 = 42
CT_IecDate = 43
CT_IecDate64 = 44
CT_IecDateTime = 45
CT_IecDateTime64 = 46
CT_IecTimeOfDay = 47
CT_IecTimeOfDay64 = 48
Las variables de valor correspondientes, por ejemplo, r._ReturnValue.Value.StringValue
, podría encontrarse en el mismo archivo, comenzando en la línea 365, por ejemplo, BoolValue
, Int8Value
, StringValue
.
Tecnología Industrial
- Términos y conceptos de la memoria digital
- Apacer:Lanzamiento de las tarjetas CV110-SD y CV110-MSD en todo el mundo
- Apacer:serie SSD SV250 de grado industrial con velocidades de lectura / escritura de 560 y 520 MB / s
- Cómo entender el big data:RTU y aplicaciones de control de procesos
- ¿Qué es el fresado?- Definición, proceso y operaciones
- ¿Qué es la perforación? Definición, proceso y consejos
- ¿Qué es la pulvimetalurgia? Definición y proceso
- ¿Qué es el brochado? - Proceso, trabajo y tipos
- ¿Qué es el mecanizado químico? - Trabajo y proceso
- ¿Qué es el mecanizado ultrasónico? - Trabajo y proceso
- ¿Qué es la soldadura por pulverización? - Proceso y técnicas