importloggingimportpandasaspdimportxml.etree.cElementTreeasETfromopenpnm.ioimportproject_to_dict,_parse_filenamefromopenpnm.utils._miscimportis_transientlogger=logging.getLogger(__name__)_header="""<?xml version="1.0" ?> <!DOCTYPE Xdmf SYSTEM "Xdmf.dtd" []>"""
[docs]defproject_to_xdmf(project,filename=''):r""" Saves (transient/steady-state) data from the given objects into the specified file. Parameters ---------- network : Network The network containing the desired data phases : list[Phase] (optional, default is None) A list of phase objects whose data are to be included Notes ----- This method only saves the data, not any of the pore-scale models or other attributes. """importh5pynetwork=project.networkalgs=project.algorithms# Check if any of the phases has time seriestransient=is_transient(algs)iffilename=='':filename=project.namepath=_parse_filename(filename=filename,ext='xmf')# Path is a pathlib object, so slice it up as neededfname_xdf=path.named=project_to_dict(project=project,flatten=False,categorize_by=['element','data'])D=pd.json_normalize(d,sep='.').to_dict(orient='records')[0]forkinlist(D.keys()):D[k.replace('.','/')]=D.pop(k)# Identify time stepst_steps=[]iftransient:forkeyinD.keys():if'#'inkey:t_steps.append(key.split('#')[1])t_steps=list(set(t_steps))t_grid=create_grid(Name="TimeSeries",GridType="Collection",CollectionType="Temporal")# If steady-state, define '0' time stepifnottransient:t_steps.append('0')# Setup xdmf fileroot=create_root('Xdmf')domain=create_domain()# Iterate over time steps presentfori,t_stepinenumerate(t_steps):# Define the hdf fileifnottransient:fname_hdf=path.stem+".hdf"else:fname_hdf=path.stem+'#'+t_step+".hdf"path_p=path.parentf=h5py.File(path_p.joinpath(fname_hdf),"w")# Add coordinate and connection information to top of HDF5 filef["coordinates"]=network["pore.coords"]f["connections"]=network["throat.conns"]# geometry coordinatesrow,col=f["coordinates"].shapedims=' '.join((str(row),str(col)))hdf_loc=fname_hdf+":coordinates"geo_data=create_data_item(value=hdf_loc,Dimensions=dims,Format='HDF',DataType="Float")geo=create_geometry(GeometryType="XYZ")geo.append(geo_data)# topolgy connectionsrow,col=f["connections"].shape# col first then rowdims=' '.join((str(row),str(col)))hdf_loc=fname_hdf+":connections"topo_data=create_data_item(value=hdf_loc,Dimensions=dims,Format="HDF",NumberType="Int")topo=create_topology(TopologyType="Polyline",NodesPerElement=str(2),NumberOfElements=str(row))topo.append(topo_data)# Make HDF5 file with all datasets, and no groupsforiteminlist(D.keys()):ifD[item].dtype=='O':logger.warning(item+' has dtype object,'+' will not write to file')delD[item]elif'U'instr(D[item][0].dtype):passelif('#'initemandt_step==item.split('#')[1]):f.create_dataset(name='/'+item.split('#')[0]+'#t',shape=D[item].shape,dtype=D[item].dtype,data=D[item],compression="gzip")elif('#'notinitemandi==0):f.create_dataset(name='/'+item,shape=D[item].shape,dtype=D[item].dtype,data=D[item],compression="gzip")# Create a gridgrid=create_grid(Name=t_step,GridType="Uniform")time=create_time(mode='Single',Value=t_step)grid.append(time)# Add pore and throat propertiesforiteminlist(D.keys()):ifitemnotin['coordinates','connections']:if("#"initemandt_step==item.split("#")[1])or("#"notinitem):attr_type='Scalar'shape=D[item].shapedims=(' '.join([str(i)foriinshape]))if'#'initem:item=item.split('#')[0]+'#t'hdf_loc=fname_hdf+":"+itemelif('#'notinitemandi==0):hdf_loc=fname_hdf+":"+itemelif('#'notinitemandi>0):hdf_loc=path.stem+'#'+t_steps[0]+".hdf"+":"+itemattr=create_data_item(value=hdf_loc,Dimensions=dims,Format='HDF',Precision='8',DataType='Float')name=item.replace('/',' | ')if'throat'initem:Center="Cell"else:Center="Node"el_attr=create_attribute(Name=name,Center=Center,AttributeType=attr_type)el_attr.append(attr)grid.append(el_attr)else:passgrid.append(topo)grid.append(geo)t_grid.append(grid)# CLose the HDF5 filef.close()domain.append(t_grid)root.append(domain)withopen(path_p.joinpath(fname_xdf),'w')asfile:file.write(_header)file.write(ET.tostring(root).decode("utf-8"))
defcreate_root(Name):returnET.Element(Name)defcreate_domain():returnET.Element("Domain")defcreate_geometry(GeometryType,**attribs):element=ET.Element('Geometry')element.attrib.update({'GeometryType':GeometryType})element.attrib.update(attribs)returnelementdefcreate_topology(TopologyType,NumberOfElements,**attribs):element=ET.Element('Topology')element.attrib.update({'TopologyType':TopologyType,'NumberOfElements':NumberOfElements})ifTopologyTypein['Polyline']:if'NodesPerElement'notinattribs.keys():raiseException('NodesPerElement must be specified')element.attrib.update(attribs)returnelementdefcreate_attribute(Name,**attribs):element=ET.Element('Attribute')element.attrib.update({'Name':Name})element.attrib.update(attribs)returnelementdefcreate_time(mode='Single',Value=None):element=ET.Element('Time')ifmode=='Single'andValue:element.attrib['Value']=Valuereturnelementdefcreate_grid(Name,GridType,**attribs):element=ET.Element('Grid')element.attrib.update({'Name':Name,'GridType':GridType,'Section':None})element.attrib.update(attribs)ifelement.attrib['GridType']!='Subset':if'Section'inelement.attrib.keys():delelement.attrib['Section']returnelementdefcreate_data_item(value,Dimensions,**attribs):element=ET.Element('DataItem')element.attrib.update({'ItemType':"Uniform",'Format':"XML",'DataType':"Float",'Precision':"4",'Rank':"1",'Dimensions':Dimensions,'Function':None})element.attrib.update(attribs)ifelement.attrib['Function']isNone:delelement.attrib['Function']element.text=valuereturnelement